Identifying performance bottlenecks by sparkMeasure in Apache Spark

When you want to see the bottlenecks in your code on Apache Spark, you can use the detailed logs or REST API.
By the integration with your notebooks and your programming code, sparkMeasure simplifies your works for these logging, monitoring and analyzing in Apache Spark.

In this post, I briefly show you how to use this package in your Apache Spark cluster.
Here, I used Azure Databricks.

For running the scripts in this post, please install the following packages in your cluster.

  • Install ch.cern.sparkmeasure:spark-measure_2.11:0.15 (Java package) from Maven repository
  • Install sparkMeasure (pyspark wrapper package) with PyPI

Before starting, let me briefly explain about Job, Stage, and Task in Apache Spark.

First of all, when you request some work in Apache Spark (such as, submitting an application or running in notebook), Spark Jobs are submitted. Job is the start point for the distributed execution in Spark.
Each job includes several physical executions called Stages. Then each Stage runs multiple Tasks, which are the real execution units corresponding to each Spark partitions of RDD.

Here I illustrate this outline image as below.

See Ease of Measurement by “sparkMeasure”

In this post I measure the following line of code, which loads 420 MB data from CSV file and generates Spark dataframe. (Please see here for the original notebook.)

df = (sqlContext.read.format("csv").
  option("header", "true").
  option("nullValue", "NA").
  option("inferSchema", True).

First we measure “Stage” metrics as follows.
For measuring Stage metrics, please create sparkmeasure.StageMetrics for preparation.

from sparkmeasure import StageMetrics
metricStage = StageMetrics(spark)

Now we measure the previous code using runandmeasure() method.
With this method, the specified code (2nd argument in runandmeasure() below) goes into run in this context, and it is measured and metrics are collected.

  'df = sqlContext.read.format("csv").option("header", "true").option("nullValue", "NA").option("inferSchema", True).load("abfss://container01@demostore01.dfs.core.windows.net/flight_weather.csv")')

After the measurement, you can find the temporary table named “perfstagemetrics”. (This table is temporary and then it will be evaporated when you restart your cluster.)

show tables

Now let’s see the schema of “perfstagemetrics” table. You will find a lot of metrics in this table as below.

desc formatted perfstagemetrics

Here we see the metric results with following columns.
You will find that there exist 2 jobs and corresponding 2 stages, in which stage 0 (stageId=0) is some kind of provisioning task and stage 1 (stageId=1) includes main loading tasks.

timestamp(from_unixtime(submissionTime/1000,'yyyy-MM-dd HH:mm:ss')) as submissionTime,
timestamp(from_unixtime(completionTime/1000,'yyyy-MM-dd HH:mm:ss')) as completionTime,
from perfstagemetrics

Now let’s go into details. Next we measure “Task” metrics as follows.
Same as “Stage” metrics, you create sparkmeasure.TaskMetrics and run runandmeasure() for the target code.

from sparkmeasure import TaskMetrics
metricTask = TaskMetrics(spark)

  'df = sqlContext.read.format("csv").option("header", "true").option("nullValue", "NA").option("inferSchema", True).load("abfss://container01@demostore01.dfs.core.windows.net/flight_weather.csv")')

After completion, you will find another temporary table named “perftaskmetrics” with the following columns.

show tables

desc formatted perftaskmetrics

Let’s see the metric results with the following columns here.
You can easily find that all rows are eventually separated by each tasks (each has 320,000 rows) in parallel and it takes around 6000 ms for loading in each task.

timestamp(from_unixtime(launchTime/1000,'yyyy-MM-dd HH:mm:ss')) as launchTime,
timestamp(from_unixtime(finishTime/1000,'yyyy-MM-dd HH:mm:ss')) as finishTime,
from perftaskmetrics
order by index


Since you can see the metrics without going outside your notebook, it helps you immediately identify which line is the bottleneck.
After you’ve identified stage or task, you will be able to look for the root causes of bottlenecks using Spark UI for execution plans (see below) or Ganglia metrics for insufficient resource consumption (RAM, CPU, and so on).
Once you’ve identified the root causes and improved, you can also soon check how it affects for the performance using sparkMeasure again. This work will be repeated during your experimentation.

Spark UI

Ganglia UI


Reference :

GitHub – sparkMeasure


Categories: Uncategorized

Tagged as:

1 reply »

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s