Loading and Saving Data
With Spark, it's simple to load data of any familiar format from a variety of different sources.
(Example notebook can be found here)
First let's download some data...
Download an example dataset. One version of the data is in csv format, the other is in json.
Python
import requests
import os
os.makedirs("/data", exist_ok=True)
with open("/data/winemag-reviews.csv", "wb") as file:
csv_url = "http://tinyurl.com/winemag-reviews-csv"
file.write(requests.get(csv_url).content)
with open("/data/winemag-reviews.json", "wb") as file:
json_url = "http://tinyurl.com/winemag-reviews-json"
file.write(requests.get(json_url).content)
Loading Files
Use spark.read.format()
to specify the format of the data you want to load. Spark supports all major data storage formats, including csv, json, parquet, and many more.
You can use .printSchema()
to see the schema of the data you loaded.
Python
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
csv_path = "/data/winemag-reviews.csv"
df1 = spark.read.format("csv").load(csv_path, header=True)
df1.printSchema()
json_path = "/data/winemag-reviews.json"
df2 = spark.read.format("json").load(json_path)
df2.printSchema()
# root
# |-- _c0: string (nullable = true)
# |-- country: string (nullable = true)
# |-- description: string (nullable = true)
# |-- designation: string (nullable = true)
# |-- points: string (nullable = true)
# |-- price: string (nullable = true)
# |-- province: string (nullable = true)
# |-- region_1: string (nullable = true)
# |-- region_2: string (nullable = true)
# |-- taster_name: string (nullable = true)
# |-- taster_twitter_handle: string (nullable = true)
# |-- title: string (nullable = true)
# |-- variety: string (nullable = true)
# |-- winery: string (nullable = true)
Saving Files
You can write your dataframe with the write
attribute. When saving a file, you typically need to specify a few parameters:
- specify the write
.mode()
- specify the save file format with
.format()
- (optional) indicate if you want partitioned columns with
.partitionBy()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
json_path = "/data/winemag-reviews.json"
df = spark.read.format("json").load(json_path)
# E.g. Writing to parquet
out_fp = "/data/winemag-reviews.parquet"
df.write.mode("overwrite").format("parquet").save(out_fp) # if data already exists, overwrite
# df.write.mode("append").format("parquet").save(out_fp) # if data exists, append, otherwise create
# df.write.mode("error").format("parquet").save(out_fp) # if data already exists, throw exception
# df.write.mode("ignore").format("parquet").save(out_fp) # if data alread exists, do nothing
# E.g. Writing with partitions
out_fp = "/data/winemag-reviews.parquet"
(
df.write.mode("overwrite")
.format("parquet")
.partitionBy("variety")
.save(out_fp)
)
Quick note on using partitions: Partitioning can greatly improve query performance. You generally want to partition on columns that are used frequently in your queries, but you have to be careful not to select a column that will create too many partitions.
A partition does not working like an index in SQL. If you're working with customer transaction data, you might be tempted to partition on customer_id
, but you'd end up with a partition for every customer. However, partitioning on year-month
or date
might be a good idea.
This is just a starting point. For more information check out the official docs.