Window Functions

Window functions allow us to aggregate over different slices of our dataframe in a single step. These work the same way in Spark that they do in normal SQL. You can create Window'ed functions using pyspark.sql.Window or using F.expr with plain SQL syntax.

Let's start with a question "What are the top 3 wines for each reviewer"?

Let's load our dataframe

(Example notebook can be found here)

Python

from pyspark.sql import SparkSession, functions as F

# Initialize spark context
spark = SparkSession.builder.getOrCreate()

# load from dataset we downloaded earlier
df = spark.read.format("json").load("/data/winemag-reviews.json")
df = df.select("country","description","points","price","taster_name","title","variety")
df.show(4)

#  +--------+--------------------+------+-----+------------------+--------------------+--------------+
#  | country|         description|points|price|       taster_name|               title|       variety|
#  +--------+--------------------+------+-----+------------------+--------------------+--------------+
#  |   Italy|Aromas include tr...|    87| NULL|     Kerin O’Keefe|Nicosia 2013 Vulk...|   White Blend|
#  |Portugal|This is ripe and ...|    87|   15|        Roger Voss|Quinta dos Avidag...|Portuguese Red|
#  |      US|Tart and snappy, ...|    87|   14|      Paul Gregutt|Rainstorm 2013 Pi...|    Pinot Gris|
#  |      US|Pineapple rind, l...|    87|   13|Alexander Peartree|St. Julian 2013 R...|      Riesling|
#  +--------+--------------------+------+-----+------------------+--------------------+--------------+

Using the Window object

Python

# Using a window object
from pyspark.sql import Window

w = Window.partitionBy("taster_name").orderBy(F.desc("points"))
result_df = (
    df
    .where("taster_name is not null")
    .withColumn("wine_rank", F.rank().over(w))
    .where("wine_rank <= 3")
    .select("taster_name", "title", "points", "wine_rank")
)
result_df.show(4)

# +------------------+--------------------+------+---------+
# |       taster_name|               title|points|wine_rank|
# +------------------+--------------------+------+---------+
# |Alexander Peartree|Lovingston 2012 J...|    91|        1|
# |Alexander Peartree|Bel Lago 2013 Cha...|    91|        1|
# |Alexander Peartree|Bel Lago 2013 Cha...|    91|        1|
# |Anna Lee C. Iijima|Robert Weil 2015 ...|    98|        1|
# +------------------+--------------------+------+---------+

Using Expressions

Python

# Using an expression
result_df = (
    df
    .where("taster_name is not null")
    .withColumn("wine_rank", F.expr("rank() over (partition by taster_name order by points desc)"))
    .where("wine_rank <= 3")
    .select("taster_name", "title", "points", "wine_rank")
)
result_df.show(4)

# +------------------+--------------------+------+---------+
# |       taster_name|               title|points|wine_rank|
# +------------------+--------------------+------+---------+
# |Alexander Peartree|Lovingston 2012 J...|    91|        1|
# |Alexander Peartree|Bel Lago 2013 Cha...|    91|        1|
# |Alexander Peartree|Bel Lago 2013 Cha...|    91|        1|
# |Anna Lee C. Iijima|Robert Weil 2015 ...|    98|        1|
# +------------------+--------------------+------+---------+