PySpark 03 - Use take() instead of Collect()
PySpark 03 - Use take() instead of Collect()
This note discuss the mistake that can caused by “Lazy execution” and global variables.
The following code is the implementation of Linear-select problem.
Problem:
- Input: an array A of n numbers (unordered), and k.
- Output: the k-th smallest number (counting from 0).
Algorithm:
- x = A[0]
- partition A into A[0…mid-1] < A[mid] = x < A[mid+1…n-1]
- if mid = k then return x
- if k < mid then A = A[0…mid-1] , if k > mid then A = A[mid+1,n-1], k = k – mid – 1
- go to step 1
1 | data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26] |
However, due to the lazy execution mechanism, consider the 2nd iteration of “while”: the x that workers see is correct. But the whole calculation need to start from the very beginning A = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26]. This leads to a wrong answer = 67.
Note: In spark 3.x, it automatically cache RDD A in every iteration. So even theoretically it should output the wrong answer 67, it actually can get the correct answer 43. To reproduce this “bug”, use
A.unpersist()
to manually unpersist A.
Similar “bug” appears in the following code:
1 | A = sc.parallelize(range(10)) |
The output is “5” and “3” due to the lazy execution. This may lead to an unexpected answer when a global variable is changed unintended.
However, collect()
acts wierd:
1 | A = sc.parallelize(range(10)) |
While the output should be “[0, 1, 2, 3, 4]” and “[0, 1, 2]”. The actual output is:
1 | [0, 1, 2, 3, 4] |
Therefore, to get the answer within expectation, use take()
instead of collect()