PySpark 04 - Key Value Pairs

While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs.

For example, the following code uses the reduceByKey opertaion on key-value pairs to count how many times each line of text occurs in a file:

1
2
3
lines = sc.textFile("README.md")
pairs = lines.map(lambda s:(s, 1))
counts = pairs.reduceByKey(lambda a, b:a+b)

More examples of key-value pairs operations

reduceByKey()

1
2
3
# reduceByKey
numFruitsByLength = fruits.map(lambda fruit: (len(fruit), 1)).reduceByKey(lambda x, y: x + y)
print(numFruitsByLength.take(10))

sortByKey()

1
2
3
4
5
6
7
from operator import add

lines = sc.textFile('../data/course.txt')
counts = lines.flatMap(lambda x: x.split()) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
print(counts.sortByKey().take(20))

sortBy()

1
2
# False => descending order
print(counts.sortBy(lambda x: x[1], False).take(20))

join()

1
2
3
4
5
6
7
# Join simple example

products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")])
#trans = sc.parallelize([(1, 134, "OK"), (3, 34, "OK"), (5, 162, "Error"), (1, 135, "OK"), (2, 53, "OK"), (1, 45, "OK")])
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

print(products.join(trans).take(20))

Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine (not each task). It’s more efficient than sending closures to tasks.

Example: K-means Clustering

The algorithm:

  1. Choose k points from the input points randomly. These points represent initial group centroids.
  2. Assign each point to the closest centroid.
  3. When all points have been assigned, recalculate the positions of the k centroids.
  4. Repeat Steps 2 and 3 until the centroids no longer move. This produces a separation of the objects into groups from which the metric to be minimized can be calculated.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import numpy as np

def parseVector(line):
return np.array([float(x) for x in line.split()])

def closestPoint(p, centers):
bestIndex = 0
closest = float("+inf")
for i in range(len(centers)):
tempDist = np.sum((p - centers[i]) ** 2)
if tempDist < closest:
closest = tempDist
bestIndex = i
return bestIndex

lines = sc.textFile('../data/kmeans_data.txt', 5)

K = 3
convergeDist = 0.01
# terminate algorithm when the total distance from old center to new centers is less than this value

data = lines.map(parseVector).cache() # data is an RDD of arrays

kCenters = data.takeSample(False, K, 1) # intial centers as a list of arrays
tempDist = 1.0 # total distance from old centers to new centers

while tempDist > convergeDist:
closest = data.map(lambda p: (closestPoint(p, kCenters), (p, 1)))
# for each point in data, find its closest center
# closest is an RDD of tuples (index of closest center, (point, 1))

pointStats = closest.reduceByKey(lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1]))
# pointStats is an RDD of tuples (index of center,
# (array of sums of coordinates, total number of points assigned))

newCenters = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
# compute the new centers

tempDist = sum(np.sum((kCenters[i] - p) ** 2) for (i, p) in newCenters)
# compute the total disctance from old centers to new centers

for (i, p) in newCenters:
kCenters[i] = p

print("Final centers: ", kCenters)

Example: PageRank

The algorithm:

  1. Initialize all PR’s to 1
  2. Iteratively compute

PR(u)0.15×1N+0.85×vuPR(v)outdegree(v)PR(u)\leftarrow 0.15\times{\frac{1}{N}}+0.85\times\sum_{v\to u}\frac{PR(v)}{outdegree(v)}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import re
from operator import add

def computeContribs(urls, rank):
# Calculates URL contributions to the rank of other URLs.
num_urls = len(urls)
for url in urls:
yield (url, rank / num_urls)

def parseNeighbors(urls):
# Parses a urls pair string into urls pair."""
parts = urls.split(' ')
return parts[0], parts[1]

# Loads in input file. It should be in format of:
# URL neighbor URL
# URL neighbor URL
# URL neighbor URL
# ...

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/*
lines = sc.textFile("../data/pagerank_data.txt", 2)
# lines = sc.textFile("../data/dblp.in", 5)

numOfIterations = 10

# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)) \
.groupByKey()

# Loads all URLs with other URL(s) link to from input file
# and initialize ranks of them to one.
ranks = links.mapValues(lambda neighbors: 1.0)

# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(numOfIterations):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks) \
.flatMap(lambda url_urls_rank:
computeContribs(url_urls_rank[1][0],
url_urls_rank[1][1]))
# After the join, each element in the RDD is of the form
# (url, (list of neighbor urls, rank))

# Re-calculates URL ranks based on neighbor contributions.
# ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
ranks = contribs.reduceByKey(add).map(lambda t: (t[0], t[1] * 0.85 + 0.15))

print(ranks.top(5, lambda x: x[1]))