from pyspark import SparkContext
sc = SparkContext('local')Expressivity and composability of Spark
spark
With low-level RDD API (unstructured)
dataRDD = sc.parallelize([('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)])dataRDD.first()('Brooke', 20)
dataRDD.collect() # only use when the dataset is small[('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)]
dataRDD.take(2) # use `take(n)` to get the first n rows[('Brooke', 20), ('Denny', 31)]
mapedRDD = dataRDD.map(lambda x: (x[0], (x[1], 1)))
mapedRDD.collect()[('Brooke', (20, 1)),
('Denny', (31, 1)),
('Jules', (30, 1)),
('TD', (35, 1)),
('Brooke', (25, 1))]
reducedRDD = mapedRDD.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
reducedRDD.collect()[('Brooke', (45, 2)), ('Denny', (31, 1)), ('Jules', (30, 1)), ('TD', (35, 1))]
mapedRDD = reducedRDD.map(lambda x: (x[0], x[1][0]/x[1][1]))
mapedRDD.collect()[('Brooke', 22.5), ('Denny', 31.0), ('Jules', 30.0), ('TD', 35.0)]
sc.stop()With high-level DSL operates and DataFrame API (structured)
DSL: domain specific language
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
spark = SparkSession.builder.appName('AvgAges').getOrCreate()data_df = spark.createDataFrame([('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)],
['name', 'age'])
data_df.show()[Stage 0:> (0 + 1) / 1]
+------+---+
| name|age|
+------+---+
|Brooke| 20|
| Denny| 31|
| Jules| 30|
| TD| 35|
|Brooke| 25|
+------+---+
data_df.show(1)+------+---+
| name|age|
+------+---+
|Brooke| 20|
+------+---+
only showing top 1 row
data_df.take(2) # Get the first 2 rows[Row(name='Brooke', age=20), Row(name='Denny', age=31)]
data_df.tail(2) # Get the last 3 rows[Row(name='TD', age=35), Row(name='Brooke', age=25)]
avg_df = data_df.groupBy('name').agg(avg('age'))
avg_df.show()+------+--------+
| name|avg(age)|
+------+--------+
|Brooke| 22.5|
| Denny| 31.0|
| Jules| 30.0|
| TD| 35.0|
+------+--------+
Note
This version is far more expressive and simpler than the previous one.
spark.stop()