When you want to see the bottlenecks in your code on Apache Spark, you can use the detailed logs or REST API.
By the integration with your notebooks and your programming code, sparkMeasure simplifies your works for these logging, monitoring and analyzing in Apache Spark.
In this post, I briefly show you how to use this package in your Apache Spark cluster.
Here, I used Azure Databricks.
For running the scripts in this post, please install the following packages in your cluster.
ch.cern.sparkmeasure:spark-measure_2.11:0.15(Java package) from Maven repository
sparkMeasure(pyspark wrapper package) with PyPI
Before starting, let me briefly explain about Job, Stage, and Task in Apache Spark.
First of all, when you request some work in Apache Spark (such as, submitting an application or running in notebook), Spark Jobs are submitted. Job is the start point for the distributed execution in Spark.
Each job includes several physical executions called Stages. Then each Stage runs multiple Tasks, which are the real execution units corresponding to each Spark partitions of RDD.
Here I illustrate this outline image as below.
See Ease of Measurement by “sparkMeasure”
In this post I measure the following line of code, which loads 420 MB data from CSV file and generates Spark dataframe. (Please see here for the original notebook.)
df = (sqlContext.read.format("csv"). option("header", "true"). option("nullValue", "NA"). option("inferSchema", True). load("abfss://email@example.com/flight_weather.csv"))
First we measure “Stage” metrics as follows.
For measuring Stage metrics, please create
sparkmeasure.StageMetrics for preparation.
from sparkmeasure import StageMetrics metricStage = StageMetrics(spark)
Now we measure the previous code using
With this method, the specified code (2nd argument in
runandmeasure() below) goes into run in this context, and it is measured and metrics are collected.
metricStage.runandmeasure( locals(), 'df = sqlContext.read.format("csv").option("header", "true").option("nullValue", "NA").option("inferSchema", True).load("abfss://firstname.lastname@example.org/flight_weather.csv")')
After the measurement, you can find the temporary table named “perfstagemetrics”. (This table is temporary and then it will be evaporated when you restart your cluster.)
%sql show tables
Now let’s see the schema of “perfstagemetrics” table. You will find a lot of metrics in this table as below.
%sql desc formatted perfstagemetrics
Here we see the metric results with following columns.
You will find that there exist 2 jobs and corresponding 2 stages, in which stage 0 (stageId=0) is some kind of provisioning task and stage 1 (stageId=1) includes main loading tasks.
%sql select jobId, stageId, timestamp(from_unixtime(submissionTime/1000,'yyyy-MM-dd HH:mm:ss')) as submissionTime, timestamp(from_unixtime(completionTime/1000,'yyyy-MM-dd HH:mm:ss')) as completionTime, numTasks, recordsRead, recordsWritten, executorRunTime, executorCpuTime, executorDeserializeTime from perfstagemetrics
Now let’s go into details. Next we measure “Task” metrics as follows.
Same as “Stage” metrics, you create
sparkmeasure.TaskMetrics and run
runandmeasure() for the target code.
from sparkmeasure import TaskMetrics metricTask = TaskMetrics(spark) metricTask.runandmeasure( locals(), 'df = sqlContext.read.format("csv").option("header", "true").option("nullValue", "NA").option("inferSchema", True).load("abfss://email@example.com/flight_weather.csv")')
After completion, you will find another temporary table named “perftaskmetrics” with the following columns.
%sql show tables
%sql desc formatted perftaskmetrics
Let’s see the metric results with the following columns here.
You can easily find that all rows are eventually separated by each tasks (each has 320,000 rows) in parallel and it takes around 6000 ms for loading in each task.
%sql select jobId, stageId, host, timestamp(from_unixtime(launchTime/1000,'yyyy-MM-dd HH:mm:ss')) as launchTime, timestamp(from_unixtime(finishTime/1000,'yyyy-MM-dd HH:mm:ss')) as finishTime, recordsRead, recordsWritten, executorRunTime, executorCpuTime, executorDeserializeTime from perftaskmetrics order by index
Since you can see the metrics without going outside your notebook, it helps you immediately identify which line is the bottleneck.
After you’ve identified stage or task, you will be able to look for the root causes of bottlenecks using Spark UI for execution plans (see below) or Ganglia metrics for insufficient resource consumption (RAM, CPU, and so on).
Once you’ve identified the root causes and improved, you can also soon check how it affects for the performance using sparkMeasure again. This work will be repeated during your experimentation.
GitHub – sparkMeasure