PySpark 05 - Spark SQL
PySpark 05 - Spark SQL
DataFrames
- Idea borrowed from pandas and R
- A DataFrame is an RDD of Row objects.
- The fields is a Row can be accessed like attributes.
Create a DataFrame
DataFrame can be created with Row()
1 | from pyspark.sql import Row |
Elements in a DataFrame can be accessed through row.name
or row['name']
. In some cases, row['name']
should be chosen to avoid confilcts with row’s own attributes.
1 | row = Row(name="Alice", age=11, count=1) |
Output:
1 | <built-in method count of Row object at 0x7fb8006ae9f0> |
DataFrame can be constructed from files
1 | df = spark.read.csv('../data/building.csv', header=True, inferSchema=True) |
The show()
method here is similar to the one in pandas. By default, it shows the first 20 lines of the dataframe.
Create an RDD from the dataframe
1 | dfrdd = df.rdd |
Operations of SparkSQL
Select
1 | # The following two lines have the same effect |
Where (Same as Filter)
1 | # The following two lines have the same effect |
GroupBy
1 | df.groupBy('HVACProduct').count().show() |
OrderBy
1 | dfProduct.filter("Color = 'Black'")\ |
The Closure Feature of SparkSQL
For the following codes:
1 | d1 = dfDetail.join(dfProduct, 'ProductID') \ |
Theoretically, after executing select
on d1, the column '‘Color’ no longer exists. However, this code runs correctly. It’s because spark still have its “Lazy Execution” function here. When executing d2 = d1.filter("Color = 'Black'")
, it actually start calculation from reading the very origin csv, and due to its optimizer, operations like filter
goes to the early part of the whole process, since it can reduce the size of dataframes need to be processed.
After adding d1.persist()
before line 4, the code will raise error about the column “color” it can no longer resolve.
However, in some cases, the closure feature like this acts different.
1 | x = 5 |
No matter using unpersist
to release any dataframe from cache, the output of line 3 and line 5 is still the same. That’s because when filtering df into df1, x is recognized by the catalyst optimizer as a constant data, and spark execute the filter very early to improve the performance of following calculations. Only when executing filter
again after x is changed can spark recompute df1 with x=3
Embed SQL queries
SQL queries can run together with Spark SQL
Create a temporary view for referring
1 | df.createOrReplaceTempView('HVAC') |
Using SQL queries instead of SparkSQL APIs
1 | spark.sql('SELECT * FROM HVAC WHERE BuildingAge >= 10').show() |
Mix dataframe API with SQL
1 | # Can even mix DataFrame API with SQL: |
Undefined Function (UDF)
1 | from pyspark.sql.functions import udf |
Flexible Data Model
Spark accept json files:
1 | df = spark.read.json('../data/products.json') |
To access nested fields:
1 | df.select(df['dimensions.height']).show() |
Other functions of SparkSQL
Print the dataframe schema in a tree format
1 | df.printSchema() |
Output:
1 | root |
Remove duplication
1 | dfProduct.select('Color').distinct().count() |
Rename columns
withColumnRenamed
1 | dfDetail.select('*', (dfDetail.UnitPrice * dfDetail. OrderQty |
Conlict data types
Row fields with types incompatible with that of previous rows will be turned into nulls
1 | row1 = Row(name="Alice", age=11) |
1 | # Output |