PySpark 02 - Closure and Persistence
PySpark 02 - Closure and Persistence
1 Closure
A task’s closure is those variables and methods which must be visible for the executor to perform its computations on the RDD.
- Functions that run on RDDs at executors
- Any global variables used by those executors
The variables within the closure sent to each executor are copies.
This closure is serialized and sent to each executor from the driver when an action is invoked.
For example:
1 | counter = 0 |
The output will be:
1 | [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] |
Accumulators
Accumulators are variables that are only “added” to through an associative and commutative operation. Created from an initial value v by calling SparkContext.accumulator(v)
. Tasks running on a cluster can then add to it using the add method or the += operator. Only the driver program can read the accumulator’s value, using its value method.
1 | rdd = sc.parallelize(range(10)) |
The output will be:
1 | 45 |
The workers are never allowed to access the value of “accum”, therefore accum cannot exist on the right side of “+=”. And only “+=” can be used, “+” and “=” lead to errors.
Accumulator is not recommended since most tasks that accumulators do can be accomplished by other better methods. For example, the above code can be replaced with “.sum()” of “.reduce()”.
2 Example: Computing Pi using Monte Cario simulation
1 | # From the official spark examples. |
glom: Turn a list consists of partitions into a list of lists:
1 | # Example: glom |
Output:
1 | [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99] |
It shows that the random number in different partitions are the same. In this case, more partitions and more samples cannot improve the performance of the Monte Cario Simulation.
How to fix it:
IntroducemapPartition()
and mapPartitionWithIndex()
:
1 | # Example: mapPartition and mapPartitionWithIndex |
Therefore, with mapPartitionWithIndex
:
1 | # Correct version for computing Pi |