Uncategorized

Identifying performance bottlenecks by sparkMeasure in Apache Spark

When you want to see the bottlenecks in your code on Apache Spark, you can use the detailed logs or REST API.
By the integration with your notebooks and your programming code, sparkMeasure simplifies your works for these logging, monitoring and analyzing in Apache Spark.

In this post, I briefly show you how to use this package in your Apache Spark cluster.
Here, I used Azure Databricks.

For running the scripts in this post, please install the following packages in your cluster.

  • Install ch.cern.sparkmeasure:spark-measure_2.11:0.15 (Java package) from Maven repository
  • Install sparkMeasure (pyspark wrapper package) with PyPI

Before starting, let me briefly explain about Job, Stage, and Task in Apache Spark.

First of all, when you request some work in Apache Spark (such as, submitting an application or running in notebook), Spark Jobs are submitted. Job is the start point for the distributed execution in Spark.
Each job includes several physical executions called Stages. Then each Stage runs multiple Tasks, which are the real execution units corresponding to each Spark partitions of RDD.

Here I illustrate this outline image as below.

See Ease of Measurement by “sparkMeasure”

In this post I measure the following line of code, which loads 420 MB data from CSV file and generates Spark dataframe. (Please see here for the original notebook.)

df = (sqlContext.read.format("csv").
  option("header", "true").
  option("nullValue", "NA").
  option("inferSchema", True).
  load("abfss://container01@demostore01.dfs.core.windows.net/flight_weather.csv"))

First we measure “Stage” metrics as follows.
For measuring Stage metrics, please create sparkmeasure.StageMetrics for preparation.

from sparkmeasure import StageMetrics
metricStage = StageMetrics(spark)

Now we measure the previous code using runandmeasure() method.
With this method, the specified code (2nd argument in runandmeasure() below) goes into run in this context, and it is measured and metrics are collected.

metricStage.runandmeasure(
  locals(),
  'df = sqlContext.read.format("csv").option("header", "true").option("nullValue", "NA").option("inferSchema", True).load("abfss://container01@demostore01.dfs.core.windows.net/flight_weather.csv")')

After the measurement, you can find the temporary table named “perfstagemetrics”. (This table is temporary and then it will be evaporated when you restart your cluster.)

%sql
show tables

Now let’s see the schema of “perfstagemetrics” table. You will find a lot of metrics in this table as below.

%sql
desc formatted perfstagemetrics

Here we see the metric results with following columns.
You will find that there exist 2 jobs and corresponding 2 stages, in which stage 0 (stageId=0) is some kind of provisioning task and stage 1 (stageId=1) includes main loading tasks.

%sql
select
jobId,
stageId,
timestamp(from_unixtime(submissionTime/1000,'yyyy-MM-dd HH:mm:ss')) as submissionTime,
timestamp(from_unixtime(completionTime/1000,'yyyy-MM-dd HH:mm:ss')) as completionTime,
numTasks,
recordsRead,
recordsWritten,
executorRunTime,
executorCpuTime,
executorDeserializeTime
from perfstagemetrics

Now let’s go into details. Next we measure “Task” metrics as follows.
Same as “Stage” metrics, you create sparkmeasure.TaskMetrics and run runandmeasure() for the target code.

from sparkmeasure import TaskMetrics
metricTask = TaskMetrics(spark)

metricTask.runandmeasure(
  locals(),
  'df = sqlContext.read.format("csv").option("header", "true").option("nullValue", "NA").option("inferSchema", True).load("abfss://container01@demostore01.dfs.core.windows.net/flight_weather.csv")')

After completion, you will find another temporary table named “perftaskmetrics” with the following columns.

%sql
show tables

%sql
desc formatted perftaskmetrics

Let’s see the metric results with the following columns here.
You can easily find that all rows are eventually separated by each tasks (each has 320,000 rows) in parallel and it takes around 6000 ms for loading in each task.

%sql
select
jobId,
stageId,
host,
timestamp(from_unixtime(launchTime/1000,'yyyy-MM-dd HH:mm:ss')) as launchTime,
timestamp(from_unixtime(finishTime/1000,'yyyy-MM-dd HH:mm:ss')) as finishTime,
recordsRead,
recordsWritten,
executorRunTime,
executorCpuTime,
executorDeserializeTime
from perftaskmetrics
order by index

Since you can see the metrics in your notebook, it helps you to identify which line of code is the bottleneck.
After you’ve identified stage or task, you will be able to look for the root cause for bottlenecks using Spark UI, Query Plans and so forth in ordinary way.

Once you’ve changed your code for improvement, you can also soon check how it affects for the performance using sparkMeasure. This work will be repeated during your investigation and sparkMeasure will strongly help you to monitor these metrics.

 

Reference :

GitHub – sparkMeasure
https://github.com/LucaCanali/sparkMeasure

 

Advertisements

Categories: Uncategorized

Tagged as:

1 reply »

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s