Spark: Job Scheduling

Operations on RDDs

Narrow Dependencies and Wide Dependencies

HDFS files: Input RDD, one partition for each block of the file.

Map: Transforms each record of the RDD.

Filter: Select a subset of records.

Union: Returns the union of two RDDs.

Join: Narrow or wide dependency.

spark05.png

The scheduler examines the RDD’s lineage graph to build a DAG of stages. The boundaires are the shuffle stages. Pipelined parallel execution within one stage.

image-20240310142849900.png

Shuffle operations

Spark uses shuffles to implement wide dependencies. (Example: reduceByKey, repartition, coalesce, join (on RDDs not artitioned using the same partitioner)). Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to group/aggregate it. Internally, Spark builds a hash table within each task to perform the grouping. If the hash table is too large, Spark will spill these tables to disk ,incurring the additional overhead of disk I/O. RDDs resulting from shuffles are automatically cached.

Example: join order optimization

Have three tables:

  • Waybills (waybill, customer). Partitioned by waybill.
  • Customers (customer, phone). Partitioned by customer.
  • Waybill_status (waybill, version). Partitioned by waybill.

To join them together,

1
2
3
4
5
# We want to join 3 tables together.
# Knowing how each table is partitioned helps optimize the join order.

# waybills.join(customers, 'customer').join(waybill_status, 'waybill').show()
waybills.join(waybill_status, 'waybill').join(customers, 'customer').show()

Line 5 has a better efficiency than Line 4.

Job Scheduling

Job: a Spark action and any tasks that need to run to evaluate that action.

Multiple parallel jobs can run simultaneously if they a re submitted from separate threads.

Spark’s schedule runs job in FIFO fashion

  • Also possible to configure fair sharing between jobs, i.e., round robin.
  • Aso supports Scheduler Pools.

Task Scheduling Within a Job

Each job consists of multiple stages.

A stage can only start after all its parents stages have completed.

Each stage has many tasks.

Spark assigns tasks to machines based on data locality. If a task needs to process a partition that is available in memory on a node, we send the code of the task to that node.

Different levels of locality are used:

  • PROCESS_LOCAL data is in the same JVM as the running code.
  • NODE_LOCAL data is on the same node.
  • RACK_LOCAL data is on the same rack of servers.
  • ANY.

Two types of memory usages for applications:

  • Execution memory: for computation in shuffles, joins, sorts and aggregations.
  • Storage memory: for caching and propagating internal data across the cluster.

Execution and storage share a unified region (M). By default, it’s 0.6 * (total memory available to JVM - 300MB). Execution may evcit storage if necessary. But storage memory is guaranteed to be ≥ R, default R = 0.5 * M. Storage may not evict execution.

Remakes:

Applications that do not use caching can use the entire memory space for execution.

Applications that do use caching can reserve a minimum storage space ®.