Broadcast variables allow the programmer to keep a read-only variable cached on each machine (not each task). It’s more efficient than sending closures to tasks.
Example: K-means Clustering
The algorithm:
Choose k points from the input points randomly. These points represent initial group centroids.
Assign each point to the closest centroid.
When all points have been assigned, recalculate the positions of the k centroids.
Repeat Steps 2 and 3 until the centroids no longer move. This produces a separation of the objects into groups from which the metric to be minimized can be calculated.
defparseVector(line): return np.array([float(x) for x in line.split()])
defclosestPoint(p, centers): bestIndex = 0 closest = float("+inf") for i inrange(len(centers)): tempDist = np.sum((p - centers[i]) ** 2) if tempDist < closest: closest = tempDist bestIndex = i return bestIndex
lines = sc.textFile('../data/kmeans_data.txt', 5)
K = 3 convergeDist = 0.01 # terminate algorithm when the total distance from old center to new centers is less than this value
data = lines.map(parseVector).cache() # data is an RDD of arrays
kCenters = data.takeSample(False, K, 1) # intial centers as a list of arrays tempDist = 1.0# total distance from old centers to new centers
while tempDist > convergeDist: closest = data.map(lambda p: (closestPoint(p, kCenters), (p, 1))) # for each point in data, find its closest center # closest is an RDD of tuples (index of closest center, (point, 1)) pointStats = closest.reduceByKey(lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1])) # pointStats is an RDD of tuples (index of center, # (array of sums of coordinates, total number of points assigned)) newCenters = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect() # compute the new centers tempDist = sum(np.sum((kCenters[i] - p) ** 2) for (i, p) in newCenters) # compute the total disctance from old centers to new centers for (i, p) in newCenters: kCenters[i] = p print("Final centers: ", kCenters)
defcomputeContribs(urls, rank): # Calculates URL contributions to the rank of other URLs. num_urls = len(urls) for url in urls: yield (url, rank / num_urls)
defparseNeighbors(urls): # Parses a urls pair string into urls pair.""" parts = urls.split(' ') return parts[0], parts[1]
# Loads in input file. It should be in format of: # URL neighbor URL # URL neighbor URL # URL neighbor URL # ...
# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/* lines = sc.textFile("../data/pagerank_data.txt", 2) # lines = sc.textFile("../data/dblp.in", 5)
numOfIterations = 10
# Loads all URLs from input file and initialize their neighbors. links = lines.map(lambda urls: parseNeighbors(urls)) \ .groupByKey()
# Loads all URLs with other URL(s) link to from input file # and initialize ranks of them to one. ranks = links.mapValues(lambda neighbors: 1.0)
# Calculates and updates URL ranks continuously using PageRank algorithm. for iteration inrange(numOfIterations): # Calculates URL contributions to the rank of other URLs. contribs = links.join(ranks) \ .flatMap(lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1])) # After the join, each element in the RDD is of the form # (url, (list of neighbor urls, rank)) # Re-calculates URL ranks based on neighbor contributions. # ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15) ranks = contribs.reduceByKey(add).map(lambda t: (t[0], t[1] * 0.85 + 0.15))