Spark ML Serving with Azure Machine Learning service

Some folks asked me whether we can deploy SparkML trained model in the container like SageMaker does.
Of course, you can. Even more flexible by working with your own inference code !

Using Azure, you can train the model in Spark-based distributed platform (Azure Databricks), and soon you can deploy and serve on Azure Container Instance (ACI) or Azure Kubernetes Service (AKS).

In this post, I show you this steps (training, saving model, and deploying web server) with AML Python SDK scripts.

Here we run each scripts one by one, but you can also automate this flow using AML pipeline. (See here for the sample notebook.)

Note : As I’ve shown in my Hands-On, you can also serialize trained pipeline model with MLeap library and it can be loaded to run on your local machine (single machine) using MLeap runtime. But unfortunately, MLeap runtime doesn’t have python binding. (You should use scala or java.)
Here we don’t use MLeap.

Train your model

First we train and generate the model on Azure Databricks.
Here we reuse my Hands-On code, which predicts the flight delays as follows.

The saved model (/mnt/testblob/flight_model) is not just learning model and parameters, but also includes entire pipeline (including transformation, such as indexing, vectorizing, etc).

# Read dataset from Azure Blob (ADLS Gen 2)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
df = (sqlContext.read.format("csv").
  option("header", "true").
  option("nullValue", "NA").
  option("inferSchema", True).

# Transform DataFrame
... we skip this code ...

# Split data into train data and evaluation data
(traindf, testdf) = df.randomSplit([0.8, 0.2])

# Convert categorical values (carrier code, airport code, ...) to index values
from pyspark.ml.feature import StringIndexer
uniqueCarrierIndexer = StringIndexer(inputCol="UNIQUE_CARRIER", outputCol="Indexed_UNIQUE_CARRIER").fit(df)
originIndexer = StringIndexer(inputCol="ORIGIN", outputCol="Indexed_ORIGIN").fit(df)
destIndexer = StringIndexer(inputCol="DEST", outputCol="Indexed_DEST").fit(df)
arrDel15Indexer = StringIndexer(inputCol="ARR_DEL15", outputCol="Indexed_ARR_DEL15").fit(df)

# Assemble feature columns as a vector column
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
  inputCols = [
  outputCol = "features")

# Generate decision tree classifier
from pyspark.ml.classification import DecisionTreeClassifier
classifier = DecisionTreeClassifier(

# Create pipeline and Train
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[uniqueCarrierIndexer, originIndexer, destIndexer, arrDel15Indexer, assembler, classifier])
model = pipeline.fit(traindf)

# Evaluate generated model
... we skip this code ...

# Save pipeline

Provision your serving

Once your model is generated, you can configure and provision for serving with Azure ML Python SDK on your regular machine or on also Azure Databricks. (You can also run AML Python SDK without leaving Azure Databricks. In this example, I run scripts on my local machine.)

First, register your model into Azure ML as follows.

from azureml.core import Workspace
from azureml.core.model import Model

ws = Workspace.from_config()

# register model in AML model management
registered_model = Model.register(
  model_path = '/mnt/testblob/flight_model',
  model_name = 'flight_model',
  workspace = ws)

Next, create your own source code for serving. Following %%writefile saves this source code as score.py.

As I explained in my early post, your source code should include init() (which is executed at starting your serving) and run() (which is invoked for each requests).
As you can see below, you can still use your familiar Spark-aware commands in your scripts.

%%writefile score.py
import json
from azureml.core.model import Model
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
def init():
  global spark
  global loaded_model
  spark = SparkSession.builder.appName("flight delay serving").getOrCreate()
  model_path = Model.get_model_path(model_name="flight_model")
  loaded_model = PipelineModel.load(model_path)

def run(raw_data):
    input_list = json.loads(raw_data)["data"]
    sc = spark.sparkContext
    input_rdd = sc.parallelize(input_list)
    input_df = input_rdd.toDF()
    pred_df = loaded_model.transform(input_df)
    pred_list = pred_df.collect()
    pred_array = [int(x["prediction"]) for x in pred_list]
    return pred_array
  except Exception as e:
    result = str(e)
    return "Internal Exception : " + result

Finally we configure the container and deploy for your serving (web service) as follows.
This provisioning task is almost same as usual provisioning (see my early post), but one thing to do for your Spark ML serving is to use “spark-py” for runtime as follows.

When you set this runtime for ACI, a single container including Python, Conda, NGINX, Apache Spark and MMLSpark is configured.
Provisioned python script in your container first creates the Spark session with Spark config which sets 1 as “spark.default.parallelism” (i.e, the minimal configuration with single executor (id=”driver”)) and integrated with pyspark shell. Eventually your script is dynamically loaded and runs on this provisioned script.
These all are automatically installed, configured, and provisioned by simply setting “spark-py” for runtime.

from azureml.core.webservice import AciWebservice, Webservice
from azureml.core.image import ContainerImage
from azureml.core.conda_dependencies import CondaDependencies
# Create deploy config object
aci_conf = AciWebservice.deploy_configuration(
  description="This is a SparkML serving example.")

# Create conda dependency file
conda_dependency = CondaDependencies.create()
with open("myenv.yml", "w") as f:
# Create image config object using conda dependency
image_conf = ContainerImage.image_configuration(

# Deploy and Publish (start your service) !
svc = Webservice.deploy_from_model(

Test your service !

After your service is successfully published and running, you can soon test your web service as follows.

import requests

headers = {"Content-Type":"application/json"}
# below is for AKS deployment
# api_key = svc.get_key()
# headers = {"Content-Type":"application/json", "Authorization":("Bearer ""+ api_key)}
input_data = """
  "data": [
      "MONTH": 1,
      "DAY_OF_WEEK": 1,
      "ORIGIN": "ABQ",
      "DEST": "DFW",
      "CRS_DEP_TIME": 9,
      "CRS_ARR_TIME": 12,
      "RelativeHumidityOrigin": 23.0,
      "AltimeterOrigin": 30.55,
      "DryBulbCelsiusOrigin": 9.4,
      "WindSpeedOrigin": 3.0,
      "VisibilityOrigin": 10.0,
      "DewPointCelsiusOrigin": -10.6,
      "RelativeHumidityDest": 35.0,
      "AltimeterDest": 30.6,
      "DryBulbCelsiusDest": 7.2,
      "WindSpeedDest": 7.0,
      "VisibilityDest": 10.0,
      "DewPointCelsiusDest": -7.2
      "MONTH": 1,
      "DAY_OF_WEEK": 1,
      "ORIGIN": "BNA",
      "DEST": "DFW",
      "CRS_DEP_TIME": 12,
      "CRS_ARR_TIME": 15,
      "RelativeHumidityOrigin": 78.5,
      "AltimeterOrigin": 30.05,
      "DryBulbCelsiusOrigin": 10.8,
      "WindSpeedOrigin": 1.5,
      "VisibilityOrigin": 8.0,
      "DewPointCelsiusOrigin": 7.1,
      "RelativeHumidityDest": 86.0,
      "AltimeterDest": 29.86,
      "DryBulbCelsiusDest": 9.4,
      "WindSpeedDest": 18.0,
      "VisibilityDest": 6.0,
      "DewPointCelsiusDest": 7.2
http_res = requests.post(
  headers = headers)
print("Predicted : ", http_res.text)

Result :

Predicted : [0, 0]


Reference :



Categories: Uncategorized

Tagged as: ,

1 reply »

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s