Window Functions
Window functions allow us to aggregate over different slices of our dataframe in a single step. These work the same way in Spark that they do in normal SQL. You can create Window'ed functions using pyspark.sql.Window
or using F.expr
with plain SQL syntax.
Let's start with a question "What are the top 3 wines for each reviewer"?
Let's load our dataframe
(Example notebook can be found here)
Python
from pyspark.sql import SparkSession, functions as F
# Initialize spark context
spark = SparkSession.builder.getOrCreate()
# load from dataset we downloaded earlier
df = spark.read.format("json").load("/data/winemag-reviews.json")
df = df.select("country","description","points","price","taster_name","title","variety")
df.show(4)
# +--------+--------------------+------+-----+------------------+--------------------+--------------+
# | country| description|points|price| taster_name| title| variety|
# +--------+--------------------+------+-----+------------------+--------------------+--------------+
# | Italy|Aromas include tr...| 87| NULL| Kerin O’Keefe|Nicosia 2013 Vulk...| White Blend|
# |Portugal|This is ripe and ...| 87| 15| Roger Voss|Quinta dos Avidag...|Portuguese Red|
# | US|Tart and snappy, ...| 87| 14| Paul Gregutt|Rainstorm 2013 Pi...| Pinot Gris|
# | US|Pineapple rind, l...| 87| 13|Alexander Peartree|St. Julian 2013 R...| Riesling|
# +--------+--------------------+------+-----+------------------+--------------------+--------------+
Using the Window
object
Python
# Using a window object
from pyspark.sql import Window
w = Window.partitionBy("taster_name").orderBy(F.desc("points"))
result_df = (
df
.where("taster_name is not null")
.withColumn("wine_rank", F.rank().over(w))
.where("wine_rank <= 3")
.select("taster_name", "title", "points", "wine_rank")
)
result_df.show(4)
# +------------------+--------------------+------+---------+
# | taster_name| title|points|wine_rank|
# +------------------+--------------------+------+---------+
# |Alexander Peartree|Lovingston 2012 J...| 91| 1|
# |Alexander Peartree|Bel Lago 2013 Cha...| 91| 1|
# |Alexander Peartree|Bel Lago 2013 Cha...| 91| 1|
# |Anna Lee C. Iijima|Robert Weil 2015 ...| 98| 1|
# +------------------+--------------------+------+---------+
Using Expressions
Python
# Using an expression
result_df = (
df
.where("taster_name is not null")
.withColumn("wine_rank", F.expr("rank() over (partition by taster_name order by points desc)"))
.where("wine_rank <= 3")
.select("taster_name", "title", "points", "wine_rank")
)
result_df.show(4)
# +------------------+--------------------+------+---------+
# | taster_name| title|points|wine_rank|
# +------------------+--------------------+------+---------+
# |Alexander Peartree|Lovingston 2012 J...| 91| 1|
# |Alexander Peartree|Bel Lago 2013 Cha...| 91| 1|
# |Alexander Peartree|Bel Lago 2013 Cha...| 91| 1|
# |Anna Lee C. Iijima|Robert Weil 2015 ...| 98| 1|
# +------------------+--------------------+------+---------+