User Defined Functions

User Defined Functions (UDFs) allow for flexibility in data processing, beyond what is available in standard Spark DataFrame operations. There are two general flavors of UDFs:

(1) Standard Spark UDFs which use native python

(2) Pandas UDFs which pass your selects data to a pandas dataframe.

We'll go through a quick example of each. First let's load our data.

(Example notebook can be found here)

python

from pyspark.sql import SparkSession, functions as F

# Initialize spark context
spark = SparkSession.builder.getOrCreate()

# load from dataset we downloaded earlier
df = spark.read.format("json").load("/data/winemag-reviews.json", header=True)
df = df.select("country","description","points","price","taster_name","title","variety")
df.show(4)

# +--------+--------------------+------+-----+------------------+--------------------+--------------+
# | country|         description|points|price|       taster_name|               title|       variety|
# +--------+--------------------+------+-----+------------------+--------------------+--------------+
# |   Italy|Aromas include tr...|    87| NULL|     Kerin O’Keefe|Nicosia 2013 Vulk...|   White Blend|
# |Portugal|This is ripe and ...|    87|   15|        Roger Voss|Quinta dos Avidag...|Portuguese Red|
# |      US|Tart and snappy, ...|    87|   14|      Paul Gregutt|Rainstorm 2013 Pi...|    Pinot Gris|
# |      US|Pineapple rind, l...|    87|   13|Alexander Peartree|St. Julian 2013 R...|      Riesling|
# +--------+--------------------+------+-----+------------------+--------------------+--------------+

Standard UDFs

Standard UDFs can be accessed through the functions module that we've been importing and aliasing as F.

UDFs accept three parameters:

  • f: (function) → python function.
  • (Optional) returnType: (pyspark.sql.types.DataType or str) → the return type of the user-defined function.
  • (Optional) useArrow: (bool) → Determines whether to use Arrow to optimize the (de)serialization.

python

# Method 1. Define them as new objects
reverse_word = F.udf(lambda s: s[::-1], "String")

df.select("country", reverse_word("country").alias("yrtnuoc")).show(4, True)

# +--------+--------+
# | country| yrtnuoc|
# +--------+--------+
# |   Italy|   ylatI|
# |Portugal|lagutroP|
# |      US|      SU|
# |      US|      SU|
# +--------+--------+

# Method 2. Use them as a decorator
@F.udf("String") # return type
def reverse_word2(col) -> str:
    return col[::-1]

df.select("country", reverse_word2("country").alias("yrtnuoc")).show(4, True)

# +--------+--------+
# | country| yrtnuoc|
# +--------+--------+
# |   Italy|   ylatI|
# |Portugal|lagutroP|
# |      US|      SU|
# |      US|      SU|
# +--------+--------+

Pandas UDFs

Pandas UDFs allow you to pass your data to a function as a pandas series in a highly efficient manner. For more information see here.

Let's create a new dataset and see an example.

python

import pandas as pd
from pyspark.sql import functions as F

# Create a Spark DataFrame with a new number series
df = spark.createDataFrame([(1, 2.0), (1, 4.0), (3, 6.0), (3, 8.0), (3, 12.0)], ("id", "v"))
df.show()
# OUTPUT:
# +---+----+
# | id|   v|
# +---+----+
# |  1| 2.0|
# |  1| 4.0|
# |  3| 6.0|
# |  3| 8.0|
# |  3|12.0|
# +---+----+

# Declare the function and create the UDF (User Defined Function)
# The UDF takes a pandas Series and returns the mean of the series
@F.pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

# Apply the UDF to the entire 'v' column
df.select(mean_udf('v')).show()
# OUTPUT:
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        6.4|
# +-----------+


# Group the DataFrame by 'id' and calculate the mean of 'v' for each group
df.groupby("id").agg(mean_udf('v')).show()
# OUTPUT:
# +---+-----------------+
# | id|      mean_udf(v)|
# +---+-----------------+
# |  1|              3.0|
# |  3|8.666666666666666|
# +---+-----------------+