Expressivity and composability of Spark

spark
Author

Youfeng Zhou

Published

November 21, 2022

With low-level RDD API (unstructured)

from pyspark import SparkContext

sc = SparkContext('local')
dataRDD = sc.parallelize([('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)])
dataRDD.first()
('Brooke', 20)
dataRDD.collect() # only use when the dataset is small
[('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)]
dataRDD.take(2) # use `take(n)` to get the first n rows
[('Brooke', 20), ('Denny', 31)]
mapedRDD = dataRDD.map(lambda x: (x[0], (x[1], 1)))
mapedRDD.collect()
[('Brooke', (20, 1)),
 ('Denny', (31, 1)),
 ('Jules', (30, 1)),
 ('TD', (35, 1)),
 ('Brooke', (25, 1))]
reducedRDD = mapedRDD.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
reducedRDD.collect()
[('Brooke', (45, 2)), ('Denny', (31, 1)), ('Jules', (30, 1)), ('TD', (35, 1))]
mapedRDD = reducedRDD.map(lambda x: (x[0], x[1][0]/x[1][1]))
mapedRDD.collect()
[('Brooke', 22.5), ('Denny', 31.0), ('Jules', 30.0), ('TD', 35.0)]
sc.stop()

With high-level DSL operates and DataFrame API (structured)

DSL: domain specific language

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

spark = SparkSession.builder.appName('AvgAges').getOrCreate()
data_df = spark.createDataFrame([('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)], 
                                ['name', 'age'])
data_df.show()
[Stage 0:>                                                          (0 + 1) / 1]                                                                                
+------+---+
|  name|age|
+------+---+
|Brooke| 20|
| Denny| 31|
| Jules| 30|
|    TD| 35|
|Brooke| 25|
+------+---+
data_df.show(1)
+------+---+
|  name|age|
+------+---+
|Brooke| 20|
+------+---+
only showing top 1 row
data_df.take(2) # Get the first 2 rows
[Row(name='Brooke', age=20), Row(name='Denny', age=31)]
data_df.tail(2) # Get the last 3 rows
[Row(name='TD', age=35), Row(name='Brooke', age=25)]
avg_df = data_df.groupBy('name').agg(avg('age'))
avg_df.show()
+------+--------+
|  name|avg(age)|
+------+--------+
|Brooke|    22.5|
| Denny|    31.0|
| Jules|    30.0|
|    TD|    35.0|
+------+--------+
Note

This version is far more expressive and simpler than the previous one.

spark.stop()