from pyspark import SparkContext
= SparkContext('local') sc
Expressivity and composability of Spark
spark
With low-level RDD API (unstructured)
= sc.parallelize([('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)]) dataRDD
dataRDD.first()
('Brooke', 20)
# only use when the dataset is small dataRDD.collect()
[('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)]
2) # use `take(n)` to get the first n rows dataRDD.take(
[('Brooke', 20), ('Denny', 31)]
= dataRDD.map(lambda x: (x[0], (x[1], 1)))
mapedRDD mapedRDD.collect()
[('Brooke', (20, 1)),
('Denny', (31, 1)),
('Jules', (30, 1)),
('TD', (35, 1)),
('Brooke', (25, 1))]
= mapedRDD.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
reducedRDD reducedRDD.collect()
[('Brooke', (45, 2)), ('Denny', (31, 1)), ('Jules', (30, 1)), ('TD', (35, 1))]
= reducedRDD.map(lambda x: (x[0], x[1][0]/x[1][1]))
mapedRDD mapedRDD.collect()
[('Brooke', 22.5), ('Denny', 31.0), ('Jules', 30.0), ('TD', 35.0)]
sc.stop()
With high-level DSL operates and DataFrame API (structured)
DSL: domain specific language
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
= SparkSession.builder.appName('AvgAges').getOrCreate() spark
= spark.createDataFrame([('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)],
data_df 'name', 'age'])
[ data_df.show()
[Stage 0:> (0 + 1) / 1]
+------+---+
| name|age|
+------+---+
|Brooke| 20|
| Denny| 31|
| Jules| 30|
| TD| 35|
|Brooke| 25|
+------+---+
1) data_df.show(
+------+---+
| name|age|
+------+---+
|Brooke| 20|
+------+---+
only showing top 1 row
2) # Get the first 2 rows data_df.take(
[Row(name='Brooke', age=20), Row(name='Denny', age=31)]
2) # Get the last 3 rows data_df.tail(
[Row(name='TD', age=35), Row(name='Brooke', age=25)]
= data_df.groupBy('name').agg(avg('age'))
avg_df avg_df.show()
+------+--------+
| name|avg(age)|
+------+--------+
|Brooke| 22.5|
| Denny| 31.0|
| Jules| 30.0|
| TD| 35.0|
+------+--------+
spark.stop()