Joins
Joins are notoriously tricky in libraries like Pandas and NumPy. However, in Spark they are about as close as you can get to joins we all know and love in SQL.
Let's load some data first.
(Example notebook can be found here)
Python
from pyspark.sql import SparkSession, functions as F
# Initialize spark context
spark = SparkSession.builder.getOrCreate()
# load from dataset we downloaded earlier
df = spark.read.format("json").load("/data/winemag-reviews.json")
df = df.select("country","description","points","price","taster_name","title","variety")
df.show(4)
# +--------+--------------------+------+-----+------------------+--------------------+--------------+
# | country| description|points|price| taster_name| title| variety|
# +--------+--------------------+------+-----+------------------+--------------------+--------------+
# | Italy|Aromas include tr...| 87| NULL| Kerin O’Keefe|Nicosia 2013 Vulk...| White Blend|
# |Portugal|This is ripe and ...| 87| 15| Roger Voss|Quinta dos Avidag...|Portuguese Red|
# | US|Tart and snappy, ...| 87| 14| Paul Gregutt|Rainstorm 2013 Pi...| Pinot Gris|
# | US|Pineapple rind, l...| 87| 13|Alexander Peartree|St. Julian 2013 R...| Riesling|
# +--------+--------------------+------+-----+------------------+--------------------+--------------+
"Simple" Joins
You can join two dataframes together using the .join()
method on a common set of columns.
Let's filter our dataframe above to just show results from the reviewer with the most reviews.
Python
# Let's filter to the reviewer with the most reviews
most_reviews_df = (
df
.where("taster_name IS NOT NULL")
.groupBy("taster_name")
.agg(F.count("*").alias("ct_reviews"))
.orderBy(F.desc("ct_reviews")) # .orderBy() works the same as in SQL
.limit(1) # .limit() also works the same as in SQL
)
most_reviews_df.show()
# +-----------+----------+
# |taster_name|ct_reviews|
# +-----------+----------+
# | Roger Voss| 25514|
# +-----------+----------+
# We can use `.join()` and join on a common set of columns
result_df = df.join(most_reviews_df, on=["taster_name"], how="inner")
result_df.show(4)
# +-----------+--------+--------------------+------+-----+--------------------+--------------+----------+
# |taster_name| country| description|points|price| title| variety|ct_reviews|
# +-----------+--------+--------------------+------+-----+--------------------+--------------+----------+
# | Roger Voss|Portugal|This is ripe and ...| 87| 15|Quinta dos Avidag...|Portuguese Red| 25514|
# | Roger Voss| France|This dry and rest...| 87| 24|Trimbach 2012 Gew...|Gewürztraminer| 25514|
# | Roger Voss| France|This has great de...| 87| 27|Jean-Baptiste Ada...| Pinot Gris| 25514|
# | Roger Voss| France|This is a dry win...| 87| 30|Leon Beyer 2012 G...|Gewürztraminer| 25514|
# +-----------+--------+--------------------+------+-----+--------------------+--------------+----------+
"Complex" Joins
You can also join two dataframes together using conditional column logic. Let's look at the previous example again, with explicit conditional column logic.
Python
result_df = df.join(
most_reviews_df,
on=[df["taster_name"]==most_reviews_df["taster_name"]], # Explicit column logic
how="inner"
)
result_df.show(4)
# +--------+--------------------+------+-----+-----------+--------------------+--------------+-----------+----------+
# | country| description|points|price|taster_name| title| variety|taster_name|ct_reviews|
# +--------+--------------------+------+-----+-----------+--------------------+--------------+-----------+----------+
# |Portugal|This is ripe and ...| 87| 15| Roger Voss|Quinta dos Avidag...|Portuguese Red| Roger Voss| 25514|
# | France|This dry and rest...| 87| 24| Roger Voss|Trimbach 2012 Gew...|Gewürztraminer| Roger Voss| 25514|
# | France|This has great de...| 87| 27| Roger Voss|Jean-Baptiste Ada...| Pinot Gris| Roger Voss| 25514|
# | France|This is a dry win...| 87| 30| Roger Voss|Leon Beyer 2012 G...|Gewürztraminer| Roger Voss| 25514|
# +--------+--------------------+------+-----+-----------+--------------------+--------------+-----------+----------+
Did you notice that the taster_name
column was duplicated? This is because we used explicit column logic to join the two dataframes together. You can use .drop()
to remove the duplicate column, however, you need to be careful to specify which dataframe you want to drop the column from.
Python
result_df = df.join(
most_reviews_df,
on=[df["taster_name"]==most_reviews_df["taster_name"]],
how="inner"
)
result_df = result_df.drop(most_reviews_df["taster_name"]) # Add .drop() to remove duplicated column
result_df.show(4)
# +--------+--------------------+------+-----+--------------------+--------------+-----------+----------+
# | country| description|points|price| title| variety|taster_name|ct_reviews|
# +--------+--------------------+------+-----+--------------------+--------------+-----------+----------+
# |Portugal|This is ripe and ...| 87| 15|Quinta dos Avidag...|Portuguese Red| Roger Voss| 25514|
# | France|This dry and rest...| 87| 24|Trimbach 2012 Gew...|Gewürztraminer| Roger Voss| 25514|
# | France|This has great de...| 87| 27|Jean-Baptiste Ada...| Pinot Gris| Roger Voss| 25514|
# | France|This is a dry win...| 87| 30|Leon Beyer 2012 G...|Gewürztraminer| Roger Voss| 25514|
# +--------+--------------------+------+-----+--------------------+--------------+-----------+----------+
Let's be honest though, that whole query got pretty ugly. Let's try to clean it up.
Suggested Approach
Generally, joins are where people tend to shoot themselves in the foot. It's easy to blow up your dataset through duplication, mix up columns through column duplication, or to just make a mess of things.
- Be as explicit with columns being included in each dataset before you join.
- If you are joining on the exact same columns in both dataframes, use the "simple" method.
- If you have to do "complex" joins, use expressions and aliasing your dataframes. (See below)
Here's an example.
Python
result_df = (
df.alias("src") # (1a) You can alias your datasets at any step
.join(
most_reviews_df.alias("most"), # (1b) Alias your joining dataset as well
on=F.expr("src.taster_name == most.taster_name"), # (2) Use an expression to join your columns
how="inner"
)
.selectExpr(
"most.taster_name as tasted_most_wines", # (3) Explicitly select only the columns you need from
"src.title", # from each data source
"src.points",
"src.price",
"src.variety",
)
)
result_df.show(4)
# +-----------------+--------------------+------+-----+--------------+
# |tasted_most_wines| title|points|price| variety|
# +-----------------+--------------------+------+-----+--------------+
# | Roger Voss|Quinta dos Avidag...| 87| 15|Portuguese Red|
# | Roger Voss|Trimbach 2012 Gew...| 87| 24|Gewürztraminer|
# | Roger Voss|Jean-Baptiste Ada...| 87| 27| Pinot Gris|
# | Roger Voss|Leon Beyer 2012 G...| 87| 30|Gewürztraminer|
# +-----------------+--------------------+------+-----+--------------+