PySpark 05 - Spark SQL

DataFrames

  • Idea borrowed from pandas and R
  • A DataFrame is an RDD of Row objects.
  • The fields is a Row can be accessed like attributes.

Create a DataFrame

DataFrame can be created with Row()

1
2
3
from pyspark.sql import Row

row = Row(name="Alice", age=11)

Elements in a DataFrame can be accessed through row.name or row['name']. In some cases, row['name'] should be chosen to avoid confilcts with row’s own attributes.

1
2
3
row = Row(name="Alice", age=11, count=1)
print(row.count)
print(row['count'])

Output:

1
2
<built-in method count of Row object at 0x7fb8006ae9f0>
1

DataFrame can be constructed from files

1
2
df = spark.read.csv('../data/building.csv', header=True, inferSchema=True)
df.show()

The show() method here is similar to the one in pandas. By default, it shows the first 20 lines of the dataframe.

Create an RDD from the dataframe

1
2
dfrdd = df.rdd
dfrdd.take(3)

Operations of SparkSQL

Select

1
2
3
# The following two lines have the same effect
df.select('BuildingID', 'Country').show()
# df.select(df.BuildingID, df.Country).show()

Where (Same as Filter)

1
2
3
# The following two lines have the same effect
df.filter("Country<'USA'").select('BuildingID', lit('OK')).show()
# df.where("Country<'USA'").select('BuildingID', lit('OK')).show()

GroupBy

1
df.groupBy('HVACProduct').count().show()

OrderBy

1
2
3
4
dfProduct.filter("Color = 'Black'")\
.select('ProductID', 'Name', 'ListPrice')\
.orderBy('ListPrice')\
.show(truncate=False)

The Closure Feature of SparkSQL

For the following codes:

1
2
3
4
5
6
d1 = dfDetail.join(dfProduct, 'ProductID') \
.select('SalesOrderID', 'SalesOrderDetailID', 'Name', 'UnitPrice', 'OrderQty')
d1.show()
d2 = d1.filter("Color = 'Black'")
d2.show()
d2.explain()

Theoretically, after executing select on d1, the column '‘Color’ no longer exists. However, this code runs correctly. It’s because spark still have its “Lazy Execution” function here. When executing d2 = d1.filter("Color = 'Black'"), it actually start calculation from reading the very origin csv, and due to its optimizer, operations like filter goes to the early part of the whole process, since it can reduce the size of dataframes need to be processed.

After adding d1.persist() before line 4, the code will raise error about the column “color” it can no longer resolve.

However, in some cases, the closure feature like this acts different.

1
2
3
4
5
x = 5
df1 = df.filter(df._1 < x)
df1.show()
x = 3
df1.show()

No matter using unpersist to release any dataframe from cache, the output of line 3 and line 5 is still the same. That’s because when filtering df into df1, x is recognized by the catalyst optimizer as a constant data, and spark execute the filter very early to improve the performance of following calculations. Only when executing filter again after x is changed can spark recompute df1 with x=3

Embed SQL queries

SQL queries can run together with Spark SQL

Create a temporary view for referring

1
df.createOrReplaceTempView('HVAC')

Using SQL queries instead of SparkSQL APIs

1
spark.sql('SELECT * FROM HVAC WHERE BuildingAge >= 10').show()

Mix dataframe API with SQL

1
2
3
# Can even mix DataFrame API with SQL:
df.where('BuildingAge >= 10').createOrReplaceTempView('OldBuildings')
spark.sql('SELECT HVACproduct, COUNT(*) FROM OldBuildings GROUP BY HVACproduct').show()

Undefined Function (UDF)

1
2
3
4
5
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

slen = udf(lambda s: len(s)+2, IntegerType())
df.select('*', slen(df['Country']).alias('slen')).show()

Flexible Data Model

Spark accept json files:

1
df = spark.read.json('../data/products.json')

To access nested fields:

1
df.select(df['dimensions.height']).show()

Other functions of SparkSQL

Print the dataframe schema in a tree format

1
df.printSchema()

Output:

1
2
3
4
5
6
root
|-- BuildingID: integer (nullable = true)
|-- BuildingMgr: string (nullable = true)
|-- BuildingAge: integer (nullable = true)
|-- HVACproduct: string (nullable = true)
|-- Country: string (nullable = true)

Remove duplication

1
dfProduct.select('Color').distinct().count()

Rename columns

withColumnRenamed

1
2
3
4
5
6
dfDetail.select('*', (dfDetail.UnitPrice * dfDetail. OrderQty
* (1 - dfDetail.UnitPriceDiscount)).alias('netprice'))\
.groupBy('SalesOrderID').sum('netprice') \
.withColumnRenamed('sum(netprice)', 'TotalPrice')\
.where('TotalPrice > 10000')\
.show()

Conlict data types

Row fields with types incompatible with that of previous rows will be turned into nulls

1
2
3
4
5
row1 = Row(name="Alice", age=11)
row2 = Row(name="Bob", age='12')
rdd_rows = sc.parallelize([row1, row2])
df1 = spark.createDataFrame(rdd_rows)
df1.show()
1
2
3
4
5
6
7
# Output
+-----+----+
| name| age|
+-----+----+
|Alice| 11|
| Bob|null|
+-----+----+