User Defined Functions
User Defined Functions (UDFs) allow for flexibility in data processing, beyond what is available in standard Spark DataFrame operations. There are two general flavors of UDFs:
(1) Standard Spark UDFs which use native python
(2) Pandas UDFs which pass your selects data to a pandas dataframe.
We'll go through a quick example of each. First let's load our data.
(Example notebook can be found here)
python
from pyspark.sql import SparkSession, functions as F
# Initialize spark context
spark = SparkSession.builder.getOrCreate()
# load from dataset we downloaded earlier
df = spark.read.format("json").load("/data/winemag-reviews.json", header=True)
df = df.select("country","description","points","price","taster_name","title","variety")
df.show(4)
# +--------+--------------------+------+-----+------------------+--------------------+--------------+
# | country| description|points|price| taster_name| title| variety|
# +--------+--------------------+------+-----+------------------+--------------------+--------------+
# | Italy|Aromas include tr...| 87| NULL| Kerin O’Keefe|Nicosia 2013 Vulk...| White Blend|
# |Portugal|This is ripe and ...| 87| 15| Roger Voss|Quinta dos Avidag...|Portuguese Red|
# | US|Tart and snappy, ...| 87| 14| Paul Gregutt|Rainstorm 2013 Pi...| Pinot Gris|
# | US|Pineapple rind, l...| 87| 13|Alexander Peartree|St. Julian 2013 R...| Riesling|
# +--------+--------------------+------+-----+------------------+--------------------+--------------+
Standard UDFs
Standard UDFs can be accessed through the functions
module that we've been importing and aliasing as F
.
UDFs accept three parameters:
f
: (function) → python function.- (Optional)
returnType
: (pyspark.sql.types.DataType or str) → the return type of the user-defined function. - (Optional)
useArrow
: (bool) → Determines whether to use Arrow to optimize the (de)serialization.
python
# Method 1. Define them as new objects
reverse_word = F.udf(lambda s: s[::-1], "String")
df.select("country", reverse_word("country").alias("yrtnuoc")).show(4, True)
# +--------+--------+
# | country| yrtnuoc|
# +--------+--------+
# | Italy| ylatI|
# |Portugal|lagutroP|
# | US| SU|
# | US| SU|
# +--------+--------+
# Method 2. Use them as a decorator
@F.udf("String") # return type
def reverse_word2(col) -> str:
return col[::-1]
df.select("country", reverse_word2("country").alias("yrtnuoc")).show(4, True)
# +--------+--------+
# | country| yrtnuoc|
# +--------+--------+
# | Italy| ylatI|
# |Portugal|lagutroP|
# | US| SU|
# | US| SU|
# +--------+--------+
Pandas UDFs
Pandas UDFs allow you to pass your data to a function as a pandas series in a highly efficient manner. For more information see here.
Let's create a new dataset and see an example.
python
import pandas as pd
from pyspark.sql import functions as F
# Create a Spark DataFrame with a new number series
df = spark.createDataFrame([(1, 2.0), (1, 4.0), (3, 6.0), (3, 8.0), (3, 12.0)], ("id", "v"))
df.show()
# OUTPUT:
# +---+----+
# | id| v|
# +---+----+
# | 1| 2.0|
# | 1| 4.0|
# | 3| 6.0|
# | 3| 8.0|
# | 3|12.0|
# +---+----+
# Declare the function and create the UDF (User Defined Function)
# The UDF takes a pandas Series and returns the mean of the series
@F.pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
# Apply the UDF to the entire 'v' column
df.select(mean_udf('v')).show()
# OUTPUT:
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 6.4|
# +-----------+
# Group the DataFrame by 'id' and calculate the mean of 'v' for each group
df.groupby("id").agg(mean_udf('v')).show()
# OUTPUT:
# +---+-----------------+
# | id| mean_udf(v)|
# +---+-----------------+
# | 1| 3.0|
# | 3|8.666666666666666|
# +---+-----------------+