Loading and Saving Data

With Spark, it's simple to load data of any familiar format from a variety of different sources.

(Example notebook can be found here)

First let's download some data...

Download an example dataset. One version of the data is in csv format, the other is in json.

Python

import requests
import os

os.makedirs("/data", exist_ok=True)

with open("/data/winemag-reviews.csv", "wb") as file:
    csv_url = "http://tinyurl.com/winemag-reviews-csv"
    file.write(requests.get(csv_url).content)

with open("/data/winemag-reviews.json", "wb") as file:
    json_url = "http://tinyurl.com/winemag-reviews-json"
    file.write(requests.get(json_url).content)

Loading Files

Use spark.read.format() to specify the format of the data you want to load. Spark supports all major data storage formats, including csv, json, parquet, and many more.

You can use .printSchema() to see the schema of the data you loaded.

Python

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

csv_path = "/data/winemag-reviews.csv"
df1 = spark.read.format("csv").load(csv_path, header=True)
df1.printSchema()

json_path = "/data/winemag-reviews.json"
df2 = spark.read.format("json").load(json_path)
df2.printSchema()

#  root
#   |-- _c0: string (nullable = true)
#   |-- country: string (nullable = true)
#   |-- description: string (nullable = true)
#   |-- designation: string (nullable = true)
#   |-- points: string (nullable = true)
#   |-- price: string (nullable = true)
#   |-- province: string (nullable = true)
#   |-- region_1: string (nullable = true)
#   |-- region_2: string (nullable = true)
#   |-- taster_name: string (nullable = true)
#   |-- taster_twitter_handle: string (nullable = true)
#   |-- title: string (nullable = true)
#   |-- variety: string (nullable = true)
#   |-- winery: string (nullable = true)

Saving Files

You can write your dataframe with the write attribute. When saving a file, you typically need to specify a few parameters:

  • specify the write .mode()
  • specify the save file format with .format()
  • (optional) indicate if you want partitioned columns with .partitionBy()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

json_path = "/data/winemag-reviews.json"
df = spark.read.format("json").load(json_path)

# E.g. Writing to parquet
out_fp = "/data/winemag-reviews.parquet"
df.write.mode("overwrite").format("parquet").save(out_fp) # if data already exists, overwrite
# df.write.mode("append").format("parquet").save(out_fp) # if data exists, append, otherwise create
# df.write.mode("error").format("parquet").save(out_fp) # if data already exists, throw exception
# df.write.mode("ignore").format("parquet").save(out_fp) # if data alread exists, do nothing

# E.g. Writing with partitions
out_fp = "/data/winemag-reviews.parquet"
(
    df.write.mode("overwrite")
    .format("parquet")
    .partitionBy("variety")
    .save(out_fp)
)

This is just a starting point. For more information check out the official docs.