Create a Spark Cluster and Run ML Job – Azure AZTK

By using AZTK (Azure Distributed Data Engineering Toolkit), you can easily deploy and drop your Spark cluster, and you can take agility for parallel programming (say, starting with low-capacity VMs, performance testing with large size or GPU accelerated, etc) with massive cloud computing power.
AZTK is having much flexibility, because it uses docker images and built on top of Azure Batch which is based on the Azure Virtual Machine infrastructure (strictly speaking, VMSS infrastructure).

Now let’s take a look at our machine learning tutorials with aztk.

Note : When you want to scale your machine learning workloads (not AI workloads) with Microsoft platform, there exist following two approaches.
The benefit of using Spark cluster is that you can use the open-source ecosystem like MLlib, sparklyr, etc, etc.

– Use open-source tools and libraries on Spark cluster. (I’ll show you in this post.)

– Use microsoftml and revoscalepy (RevoScaleR in R) package with Microsoft Machine Learning Server (ML Server) on Spark cluster. By using this approach, you can run both python jobs and R jobs on ML Server or SQL Server ML Service with consistent way. (See my early post “Benefits of Microsoft R and Machine Learning Server (R Server)” for using RevoScaleR.)
You can install ML Server with easy configuration in aztk. (Here I don’t describe about this settings.)

Note : You can also use Azure HDInsight (fully-managed Azure service for Apache Hadoop) for using Spark and ML Server on YARN management. (See my early post “Benefits of Microsoft R and Machine Learning Server (R Server)” for details.)
Currently HDInsight doesn’t have the GPU instance, but AZTK does !

Note : Here we’re focusing on simple machine learning workloads, but you can use Azure Batch AI or Azure Databricks (fully-managed service) for distributed deep learning (AI) workloads in Azure.

Setup AZTK and your working directory

AZTK is built on python and you can install on all platforms including Windows, Mac, Ubuntu, etc.

For provisioning Spark cluster, aztk uses the service principal to log-in to Azure. Therefore you need to create your service principal and set permissions for your service principal as follows. (See my early post about the concept of service principal and RBAC.)
Here I show the brief steps for aztk settings, but please see the official getting-started document for detailed installation steps.

  1. Setup Python 3.5+ and pip 9.0.1+
  2. Install AZTK (Clone aztk and install with pip command)
  3. Create Azure AD service principal
  4. Create Azure Storage account and give “Contributor” permission to your service principal
  5. Create Azure Batch account and give “Contributor” permission to your service principal

After AZTK is installed, you now create your working directory on your client by typing the following command. This command creates “.aztk” folder, in which the related settings are stored. (You can create multiple working folders and manage multiple clusters.)

aztk spark init

Finally you must edit “.aztk\secrets.yaml” and fill in the settings for previously provisioned Azure services (service principal, batch account, storage) as follows.

Create your Spark cluster

Now we can create Spark cluster (master node and slave nodes) using the following command.
AZTK uses Ubuntu 16 image with docker at the bottom. (Later I’ll show more details about provisioning images.) Here we create 3 nodes of Standard D2 v2 Virtual Machines (VMSS) for cluster. Same as Azure Batch, you can also use low-priority VMs for the cluster.

aztk spark cluster create \
  --id test01 \
  --vm-size standard_d2_v2 \
  --size 3 \
  --username demouser01 \
  --password P@ssw0rd

The cluster creation is proceeded in background and you can see the provisioning state with “aztk spark cluster get” command as follows.

aztk spark cluster get --id test01

With “aztk spark cluster ssh” command, you can login to the master container image (not host machine) using ssh.
Here we’re showing installed python version after logging-in to the master image.

# ssh into container image
aztk spark cluster ssh \
  --id test01 \
  --username demouser01

>> # check version of the installed python
>> python -V
>> 
>> Python 3.5.4

With --host option, you can also login to the master host (not the container image).
Here we’re showing the running docker instances on the host machine.

# ssh into host
aztk spark cluster ssh \
  --id test01 \
  --username demouser01 \
  --host

>> # see running docker instance !
>> sudo docker ps
>> 
>> CONTAINER ID  IMAGE             COMMAND                 CREATED        STATUS        PORTS  NAMES
>> 8a2a9e06a1c6  aztk/base:latest  "/bin/bash /mnt/batc…"  5 minutes ago  Up 5 minutes         test01

As you can see in the result of “aztk spark cluster get” command (see above output), you can also get each host’s ip-addresses and ports for ssh. Therefore, if you’re using Windows client without ssh, you can also login to the host with your terminal client (PuTTY, etc) and can access to the container instance.

More about images

Please see the following docker hub directory for available docker images in aztk.
The default docker image used in aztk is aztk/base, and all other images are based on this aztk/base.
You can also bring your own images, which is based on aztk/base.

https://hub.docker.com/r/aztk

aztk/base (the base image) includes the installation of python version 3 (currently, 3.5.4) and files for Spark configuration (including PySpark, MLlib, etc) on Ubuntu 16. aztk/python includes the additional installation for anaconda, and aztk/r-base includes R runtime.
Moreover, it has the images for each corresponding purposes along with Spark versions (2.2.0, 2.1.0, or 1.6.3), GPU-accelerated or not, etc. For instance, if you need Spark 1.6.3 with GPU-accelerated, you can use aztk/python:spark1.6.3-python3.6.2-gpu.
See the official document “Github : Azure/aztk – Docker” for more details.

Note : You can also use “aztk spark init --python” or “aztk spark init --R” for corresponding image provisioning.

After VM (including docker image) is provisioned, Azure Batch configures container images with “docker run” command, which runs on Azure Batch startTask. (See my early post “Azure Batch – Walkthrough and How it works” for startTask.)
With this configuration, AZTK installs and configures Spark master and slaves as standalone mode (without YARN).

Note : You can also configure Spark cluster with HDFS using your own custom script. See the custom script sample “hdfs.sh” for details.

The image (aztk/r-base, etc) doesn’t include interactive UI like Jupyter or R Server. When you configure these post-installation tasks, you must use your own custom script (which also runs on startTask) by editing .aztk/cluster.yaml. You can also download several custom-script samples for aztk including Jupyter set-up or R Server set-up from Github repository.
For instance, the following settings in .aztk/cluster.yaml will install python 3.6.2 with anaconda for all nodes and setup Jupyter only for the master node. (Please download jupyter.sh in your current working directory beforehand.)

# cluster.yaml (setup for Conda and Jupyter)

...
docker_repo: aztk/python:spark2.2.0-python3.6.2-base
custom_scripts:
  - script: ./jupyter.sh
    runOn: master
...

Using this cluster, you can use interactive IDE called Jupyter Notebook with your web browser as follows. (You can also use PySpark console for interactive debugging without this UI.)

Note : Before connecting to Jupyter Notebook UI (http://localhost:8888/), you must connect to the master node (which is host, not container instance) with SSH tunnel’s port-forwarding configuration (8888 to localhost:8888) using your terminal client (PuTTY, etc).
This setting is different from each terminal client software (see each document for terminal clients) and the following screenshot is the configuration example for PuTTY client.

Interactive programming with Jupyter UI (PySpark, MLlib)

Now you can start your prototyping with Spark-integrated interactive IDE, such as Jupyter Notebook, RStudio, etc.
Here we use Jupyter Notebook, which is configured in the previous section.

In this post, we apply decision tree algorithms using well-known air-flight dataset and weather dataset.
Below is our dataset. To simplify our sample, I transformed the original air-flight dataset joining weather dataset for both departure (origin) weather and destination weather. (You can download from here.)
The data size is approximately 500 MB and having approximately 2,000,000 rows.

dataset (flight_weather.csv)

"ID","MONTH","UNIQUE_CARRIER","ORIGIN","DEST","ARR_DEL15","VisibilityOrigin","DryBulbCelsiusOrigin", ...
1,1,"AA","JFK","LAX",0,10,-3.9, ...
2,3,"AA","SEA","LAX",0,10,0, ...
3,9,"FQ","LAX","JFK",0,10,18.3, ...
4,2,"FL","LAX","JFK",1,10,15.6, ...
5,1,"AA","JFK","SEA",0,10,8.9, ...

This dataset includes “ARR_DEL15” which equals to 1 if it’s delayed over 15 minutes, 0 if not delayed. Here we predict this “ARR_DEL15” label with Spark cluster.

Put this dataset in the location (Azure blob, Azure Data Lake store, etc) which cluster can access. In our case, we use Azure blob storage.
For accessing Azure blob storage with WASB (HDFS extension for Azure blob storage), you must uncomment the following settings in .aztk/core-site.xml and fill in this settings.

#core-site.xml

...
<property>
  <name>fs.AbstractFileSystem.wasb.Impl</name>
  <value>org.apache.hadoop.fs.azure.Wasb</value>
</property>
<property>
  <name>fs.azure.account.key.MY_STORAGE_ACCOUNT_NAME.blob.MY_STORAGE_ACCOUNT_SUFFIX</name>
  <value>MY_STORAGE_ACCOUNT_KEY</value>
</property>
...

In my case, I set the following values.

#core-site.xml

...
<property>
  <name>fs.AbstractFileSystem.wasb.Impl</name>
  <value>org.apache.hadoop.fs.azure.Wasb</value>
</property>
<property>
  <name>fs.azure.account.key.demostore01.blob.core.windows.net</name>
  <value>CdVPm+kNDQ...</value>
</property>
...

After that, you must create your cluster with “aztk spark cluster create” command (see above), and you can access your blob storage with WASB in your python code.

Now let’s create new ipython notebook (.ipynb file) in Jupyter UI.

To read our dataset using WASB, write the following python code in the cell and press “Run”. (Note that the blob url format is wasbs://{storage_container}@{storage_account}.blob.core.windows.net/{blob name}. Please change these to appropriate values.)

# Spark session
spark

# Load data from Azure Storage
df = spark.read.csv(
  "wasbs://container01@demostore01.blob.core.windows.net/flight_weather.csv",
  header=True,
  inferSchema=True)

If succeeded, the data is stored as PySpark DataFrame. You can show data as table using toPandas().

For using decision tree in MLlib (machine learning library for Spark), we must convert categorical data (say, airport code like “LAX”, “SEA”, “JFK”, etc) to numeric integer 0, 1, … The following code is converting each values, such as UNIQUE_CARRIER (carrier code), ORIGIN (airport code), DEST (airport code), etc with pyspark builtin functions.

If you’re not familiar with pyspark, you might think this code for distributed manners (not like Pandas) is so cumbersome. By using pyspark data types and functions, data itself is not extracted until the data is really needed. Of course, these operations would be distributed if available.

Note : See “Apache Spark document – pyspark package” for available functions.

from pyspark.sql.functions import when
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType

#
# Select only label and features
#
df = df.select(
  "ARR_DEL15",
  "MONTH",
  "DAY_OF_MONTH",
  "DAY_OF_WEEK",
  "UNIQUE_CARRIER",
  "ORIGIN",
  "DEST",
  "CRS_DEP_TIME",
  "CRS_ARR_TIME",
  "RelativeHumidityOrigin",
  "AltimeterOrigin",
  "DryBulbCelsiusOrigin",
  "WindSpeedOrigin",
  "VisibilityOrigin",
  "DewPointCelsiusOrigin",
  "RelativeHumidityDest",
  "AltimeterDest",
  "DryBulbCelsiusDest",
  "WindSpeedDest",
  "VisibilityDest",
  "DewPointCelsiusDest")

#
# Drop rows having non-numeric field (like "N/A", "null", etc)
#

# ARR_DEL15
df = df.withColumn("ARR_DEL15",
  df.ARR_DEL15.cast("integer"))
df = df.dropna(subset=["ARR_DEL15"])

# MONTH
df = df.withColumn("MONTH",
  df.MONTH.cast("integer"))
df = df.dropna(subset=["MONTH"])

# DAY_OF_MONTH
df = df.withColumn("DAY_OF_MONTH",
  df.DAY_OF_MONTH.cast("integer"))
df = df.dropna(subset=["DAY_OF_MONTH"])

# DAY_OF_WEEK
df = df.withColumn("DAY_OF_WEEK",
  df.DAY_OF_WEEK.cast("integer"))
df = df.dropna(subset=["DAY_OF_WEEK"])

# CRS_DEP_TIME
df = df.withColumn("CRS_DEP_TIME",
  df.CRS_DEP_TIME.cast("integer"))
df = df.dropna(subset=["CRS_DEP_TIME"])

# CRS_ARR_TIME
df = df.withColumn("CRS_ARR_TIME",
  df.CRS_ARR_TIME.cast("integer"))
df = df.dropna(subset=["CRS_ARR_TIME"])

# RelativeHumidityOrigin
df = df.withColumn("RelativeHumidityOrigin",
  df.RelativeHumidityOrigin.cast("double"))
df = df.dropna(subset=["RelativeHumidityOrigin"])

# AltimeterOrigin
df = df.withColumn("AltimeterOrigin",
  df.AltimeterOrigin.cast("double"))
df = df.dropna(subset=["AltimeterOrigin"])

# DryBulbCelsiusOrigin
df = df.withColumn("DryBulbCelsiusOrigin",
  df.DryBulbCelsiusOrigin.cast("double"))
df = df.dropna(subset=["DryBulbCelsiusOrigin"])

# WindSpeedOrigin
df = df.withColumn("WindSpeedOrigin",
  df.WindSpeedOrigin.cast("double"))
df = df.dropna(subset=["WindSpeedOrigin"])

# VisibilityOrigin
df = df.withColumn("VisibilityOrigin",
  df.VisibilityOrigin.cast("double"))
df = df.dropna(subset=["VisibilityOrigin"])

# DewPointCelsiusOrigin
df = df.withColumn("DewPointCelsiusOrigin",
  df.DewPointCelsiusOrigin.cast("double"))
df = df.dropna(subset=["DewPointCelsiusOrigin"])

# RelativeHumidityDest
df = df.withColumn("RelativeHumidityDest",
  df.RelativeHumidityDest.cast("double"))
df = df.dropna(subset=["RelativeHumidityDest"])

# AltimeterDest
df = df.withColumn("AltimeterDest",
  df.AltimeterDest.cast("double"))
df = df.dropna(subset=["AltimeterDest"])

# DryBulbCelsiusDest
df = df.withColumn("DryBulbCelsiusDest",
  df.DryBulbCelsiusDest.cast("double"))
df = df.dropna(subset=["DryBulbCelsiusDest"])

# WindSpeedDest
df = df.withColumn("WindSpeedDest",
  df.WindSpeedDest.cast("double"))
df = df.dropna(subset=["WindSpeedDest"])

# VisibilityDest
df = df.withColumn("VisibilityDest",
  df.VisibilityDest.cast("double"))
df = df.dropna(subset=["VisibilityDest"])

# DewPointCelsiusDest
df = df.withColumn("DewPointCelsiusDest",
  df.DewPointCelsiusDest.cast("double"))
df = df.dropna(subset=["DewPointCelsiusDest"])

#
# Convert categorical value to numeric index (0, 1, ...)
#

# MONTH
df = df.withColumn("MONTH",
  df.MONTH - 1)

# DAY_OF_MONTH
df = df.withColumn("DAY_OF_MONTH",
  df.DAY_OF_MONTH - 1)

# DAY_OF_WEEK
df = df.withColumn("DAY_OF_WEEK",
  df.DAY_OF_WEEK - 1)

# UNIQUE_CARRIER
rows_unique_carrier = df.select("UNIQUE_CARRIER").distinct().collect()
list_unique_carrier = [i.UNIQUE_CARRIER for i in rows_unique_carrier]
convUniqueCarrier = UserDefinedFunction(
  lambda x: list_unique_carrier.index(x), IntegerType())
df = df.withColumn("UNIQUE_CARRIER",
  when(df["UNIQUE_CARRIER"].isNotNull(),
     convUniqueCarrier(df.UNIQUE_CARRIER)).otherwise(len(list_unique_carrier)))

# ORIGIN
rows_origin = df.select("ORIGIN").distinct().collect()
list_origin = [i.ORIGIN for i in rows_origin]
convOrigin = UserDefinedFunction(lambda x: list_origin.index(x), IntegerType())
df = df.withColumn("ORIGIN",
  when(df["ORIGIN"].isNotNull(),
     convOrigin(df.ORIGIN)).otherwise(len(list_origin)))

# DEST
rows_dest = df.select("DEST").distinct().collect()
list_dest = [i.DEST for i in rows_dest]
convDest = UserDefinedFunction(lambda x: list_dest.index(x), IntegerType())
df = df.withColumn("DEST",
  when(df["DEST"].isNotNull(),
     convDest(df.DEST)).otherwise(len(list_dest)))

Training data for MLlib decision tree must be LabeledPoint object, and next code is converting our dataset to LabeledPoint.
Here we’re using “ARR_DEL15” (0 or 1 whether it’s delayed over 15 minutes) as predicted label and other several columns (“DAY_OF_WEEK”, “UNIQUE_CARRIER”, “WindSpeedDest”, etc) as input features (variables).

from pyspark.mllib.regression import LabeledPoint

# Create LabeledPoint object (label is "ARR_DEL15")
rdd = df.rdd.map(
  lambda row: LabeledPoint(
    row.ARR_DEL15,
    [
      row.MONTH,
      row.DAY_OF_MONTH,
      row.DAY_OF_WEEK,
      row.UNIQUE_CARRIER,
      row.ORIGIN,
      row.DEST,
      row.CRS_DEP_TIME,
      row.CRS_ARR_TIME,
      row.RelativeHumidityOrigin,
      row.AltimeterOrigin,
      row.DryBulbCelsiusOrigin,
      row.WindSpeedOrigin,
      row.VisibilityOrigin,
      row.DewPointCelsiusOrigin,
      row.RelativeHumidityDest,
      row.AltimeterDest,
      row.DryBulbCelsiusDest,
      row.WindSpeedDest,
      row.VisibilityDest,
      row.DewPointCelsiusDest
    ]))

Now let’s train your model (analyze) with our data by decision tree in MLlib as follows !
After the model is generated, we save the model in Azure blob storage with WASB.

# Split data for training (70%) and testing (30%)
trainrdd, testrdd = rdd.randomSplit([0.7, 0.3], 17)

# Run Decision Tree algorithms in MLlib !
# (model is generated)
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
model = DecisionTree.trainClassifier(
  trainrdd,
  numClasses=2,
  categoricalFeaturesInfo={
    0: 12,
    1: 31,
    2: 7,
    3: len(list_unique_carrier) + 1,
    4: len(list_origin) + 1,
    5: len(list_dest) + 1,
    6: 24,
    7: 24},
  impurity='entropy',
  maxDepth=7,
  maxBins=300)

# save model
model.save(sc,
  "wasbs://container01@demostore01.blob.core.windows.net/model/flightdelay")

Of course, you can monitor the running job with Spark Web UI (http://localhost:8080/ or http://localhost:4040/) as the following screenshot.
Please make sure that these ports are also configured for SSH tunnel as I previously explained. (See the previous section for setting SSH tunnel’s configuration.)

Job Monitoring on 4040

Now you can predict using generated model.
The following code is predicting with test data, and show the ratio (accuracy) of “how many data is correctly predicted”. Here we just calculate the accuracy, but you can also evaluate more detailed matrices like true/positive, true/negative, false/positive, or false/negative.

# Predict using test data and set row index
preds = model.predict(testrdd.map(lambda p: p.features))
preds = preds.zipWithIndex()
preds = preds.map(lambda x: (x[1], x[0]))
# Get actual label and set row index
labels = testrdd.map(lambda x: x.label)
labels = labels.zipWithIndex()
labels = labels.map(lambda x: (x[1], x[0]))
# Join and Get accuracy (%)
labels_and_preds = labels.join(preds)
accuracy = labels_and_preds.filter(
  lambda x: x[1][0] == x[1][1]).count() / float(testrdd.count())
print("Accuracy is %f" % accuracy)  # output accuracy

Submit as Spark application

Your code is ready. Next we submit (or schedule) your logic as Spark application.
With aztk, you can submit your application from your working client without spark-submit. Here we submit the previous sample workloads (creating and saving the model for air-flight delay) as Spark python application.

Below is our complete code and almost all is the same as previous pyspark code, except for initializing and stopping the Spark session. (In the previous ipynotebook example, the underlying pyspark shell is doing these initialization tasks.)
We assume that we saved this python code as “test.py” in our working directory on client.

test.py

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType
from pyspark.mllib.regression import LabeledPoint

if __name__ == "__main__":
  spark = SparkSession.builder.appName("jobtest01").getOrCreate()
  sc = spark.sparkContext

  #
  # Load data from Azure Storage
  #
  df = spark.read.csv(
    "wasbs://container01@demostore01.blob.core.windows.net/flight_weather.csv",
    header=True,
    inferSchema=True)

  #
  # Select only label and features
  #
  df = df.select(
    "ARR_DEL15",
    "MONTH",
    "DAY_OF_MONTH",
    "DAY_OF_WEEK",
    "UNIQUE_CARRIER",
    "ORIGIN",
    "DEST",
    "CRS_DEP_TIME",
    "CRS_ARR_TIME",
    "RelativeHumidityOrigin",
    "AltimeterOrigin",
    "DryBulbCelsiusOrigin",
    "WindSpeedOrigin",
    "VisibilityOrigin",
    "DewPointCelsiusOrigin",
    "RelativeHumidityDest",
    "AltimeterDest",
    "DryBulbCelsiusDest",
    "WindSpeedDest",
    "VisibilityDest",
    "DewPointCelsiusDest")

  #
  # Drop rows having non-numeric field (like "N/A", "null", etc)
  #

  # ARR_DEL15
  df = df.withColumn("ARR_DEL15",
    df.ARR_DEL15.cast("integer"))
  df = df.dropna(subset=["ARR_DEL15"])

  # MONTH
  df = df.withColumn("MONTH",
    df.MONTH.cast("integer"))
  df = df.dropna(subset=["MONTH"])

  # DAY_OF_MONTH
  df = df.withColumn("DAY_OF_MONTH",
    df.DAY_OF_MONTH.cast("integer"))
  df = df.dropna(subset=["DAY_OF_MONTH"])

  # DAY_OF_WEEK
  df = df.withColumn("DAY_OF_WEEK",
    df.DAY_OF_WEEK.cast("integer"))
  df = df.dropna(subset=["DAY_OF_WEEK"])

  # CRS_DEP_TIME
  df = df.withColumn("CRS_DEP_TIME",
    df.CRS_DEP_TIME.cast("integer"))
  df = df.dropna(subset=["CRS_DEP_TIME"])

  # CRS_ARR_TIME
  df = df.withColumn("CRS_ARR_TIME",
    df.CRS_ARR_TIME.cast("integer"))
  df = df.dropna(subset=["CRS_ARR_TIME"])

  # RelativeHumidityOrigin
  df = df.withColumn("RelativeHumidityOrigin",
    df.RelativeHumidityOrigin.cast("double"))
  df = df.dropna(subset=["RelativeHumidityOrigin"])

  # AltimeterOrigin
  df = df.withColumn("AltimeterOrigin",
    df.AltimeterOrigin.cast("double"))
  df = df.dropna(subset=["AltimeterOrigin"])

  # DryBulbCelsiusOrigin
  df = df.withColumn("DryBulbCelsiusOrigin",
    df.DryBulbCelsiusOrigin.cast("double"))
  df = df.dropna(subset=["DryBulbCelsiusOrigin"])

  # WindSpeedOrigin
  df = df.withColumn("WindSpeedOrigin",
    df.WindSpeedOrigin.cast("double"))
  df = df.dropna(subset=["WindSpeedOrigin"])

  # VisibilityOrigin
  df = df.withColumn("VisibilityOrigin",
    df.VisibilityOrigin.cast("double"))
  df = df.dropna(subset=["VisibilityOrigin"])

  # DewPointCelsiusOrigin
  df = df.withColumn("DewPointCelsiusOrigin",
    df.DewPointCelsiusOrigin.cast("double"))
  df = df.dropna(subset=["DewPointCelsiusOrigin"])

  # RelativeHumidityDest
  df = df.withColumn("RelativeHumidityDest",
    df.RelativeHumidityDest.cast("double"))
  df = df.dropna(subset=["RelativeHumidityDest"])

  # AltimeterDest
  df = df.withColumn("AltimeterDest",
    df.AltimeterDest.cast("double"))
  df = df.dropna(subset=["AltimeterDest"])

  # DryBulbCelsiusDest
  df = df.withColumn("DryBulbCelsiusDest",
    df.DryBulbCelsiusDest.cast("double"))
  df = df.dropna(subset=["DryBulbCelsiusDest"])

  # WindSpeedDest
  df = df.withColumn("WindSpeedDest",
    df.WindSpeedDest.cast("double"))
  df = df.dropna(subset=["WindSpeedDest"])

  # VisibilityDest
  df = df.withColumn("VisibilityDest",
    df.VisibilityDest.cast("double"))
  df = df.dropna(subset=["VisibilityDest"])

  # DewPointCelsiusDest
  df = df.withColumn("DewPointCelsiusDest",
    df.DewPointCelsiusDest.cast("double"))
  df = df.dropna(subset=["DewPointCelsiusDest"])

  #
  # Convert categorical value to numeric index (0, 1, ...)
  #

  # MONTH
  df = df.withColumn("MONTH",
    df.MONTH - 1)

  # DAY_OF_MONTH
  df = df.withColumn("DAY_OF_MONTH",
    df.DAY_OF_MONTH - 1)

  # DAY_OF_WEEK
  df = df.withColumn("DAY_OF_WEEK",
    df.DAY_OF_WEEK - 1)

  # UNIQUE_CARRIER
  rows_unique_carrier = df.select("UNIQUE_CARRIER").distinct().collect()
  list_unique_carrier = [i.UNIQUE_CARRIER for i in rows_unique_carrier]
  convUniqueCarrier = UserDefinedFunction(
    lambda x: list_unique_carrier.index(x), IntegerType())
  df = df.withColumn("UNIQUE_CARRIER",
    when(df["UNIQUE_CARRIER"].isNotNull(),
       convUniqueCarrier(df.UNIQUE_CARRIER)).otherwise(len(list_unique_carrier)))

  # ORIGIN
  rows_origin = df.select("ORIGIN").distinct().collect()
  list_origin = [i.ORIGIN for i in rows_origin]
  convOrigin = UserDefinedFunction(lambda x: list_origin.index(x), IntegerType())
  df = df.withColumn("ORIGIN",
    when(df["ORIGIN"].isNotNull(),
       convOrigin(df.ORIGIN)).otherwise(len(list_origin)))

  # DEST
  rows_dest = df.select("DEST").distinct().collect()
  list_dest = [i.DEST for i in rows_dest]
  convDest = UserDefinedFunction(lambda x: list_dest.index(x), IntegerType())
  df = df.withColumn("DEST",
    when(df["DEST"].isNotNull(),
       convDest(df.DEST)).otherwise(len(list_dest)))

  #
  # Create LabeledPoint object (label is "ARR_DEL15")
  #
  rdd = df.rdd.map(
    lambda row: LabeledPoint(
      row.ARR_DEL15,
      [
        row.MONTH,
        row.DAY_OF_MONTH,
        row.DAY_OF_WEEK,
        row.UNIQUE_CARRIER,
        row.ORIGIN,
        row.DEST,
        row.CRS_DEP_TIME,
        row.CRS_ARR_TIME,
        row.RelativeHumidityOrigin,
        row.AltimeterOrigin,
        row.DryBulbCelsiusOrigin,
        row.WindSpeedOrigin,
        row.VisibilityOrigin,
        row.DewPointCelsiusOrigin,
        row.RelativeHumidityDest,
        row.AltimeterDest,
        row.DryBulbCelsiusDest,
        row.WindSpeedDest,
        row.VisibilityDest,
        row.DewPointCelsiusDest
      ]))

  #
  # Split data for training (70%) and testing (30%)
  #
  trainrdd, testrdd = rdd.randomSplit([0.7, 0.3], 17)

  #
  # Run Decision Tree algorithms in MLlib !
  # (model is generated)
  from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
  model = DecisionTree.trainClassifier(
    trainrdd,
    numClasses=2,
    categoricalFeaturesInfo={
      0: 12,
      1: 31,
      2: 7,
      3: len(list_unique_carrier) + 1,
      4: len(list_origin) + 1,
      5: len(list_dest) + 1,
      6: 24,
      7: 24},
    impurity='entropy',
    maxDepth=7,
    maxBins=300)

  #
  # save model
  #
  model.save(sc, "wasbs://container01@demostore01.blob.core.windows.net/model/flightdelay")

  #
  # Predict using test data and set row index
  #
  preds = model.predict(testrdd.map(lambda p: p.features))
  preds = preds.zipWithIndex()
  preds = preds.map(lambda x: (x[1], x[0]))

  #
  # Get actual label and set row index
  #
  labels = testrdd.map(lambda x: x.label)
  labels = labels.zipWithIndex()
  labels = labels.map(lambda x: (x[1], x[0]))

  #
  # Join and Get accuracy
  #
  labels_and_preds = labels.join(preds)
  accuracy = labels_and_preds.filter(
    lambda x: x[1][0] == x[1][1]).count() / float(testrdd.count())
  print("Accuracy is %f" % accuracy)  # output accuracy

  spark.stop()

Submit this python application (test.py) to our Spark cluster with the following command. After the application is done, you would be able to view the generated model in your Azure blob storage.

aztk spark cluster submit \
  --id test01 \
  --name jobtest01 \
  test.py

Same as previous job in pyspark shell, you can monitor this application using Spark Web UI as follows. (Note that the following “pyspark-shell” is the interactive process in Jupyter Notebook.)

Application Monitoring on 8080 (resource manager UI for standalone mode)

 

When you have all done, you can delete the cluster (computing nodes) with the following command for saving your money !

# delete cluster
aztk spark cluster delete --id test01

 

In this post we used python for our tutorial, but you can also use R with aztk, in which you can use SparklyR instead of using PySpark, R Studio Server for interactive UI. Of course, you can also use your cluster for Scala and Java workloads with aztk. (Use “aztk spark init --scala” or “aztk spark init --java“.)

Github : SparklyR on Azure with AZTK
https://github.com/Azure/aztk/wiki/SparklyR-on-Azure-with-AZTK

 

Advertisements

Azure AD v2 endpoint – How to use custom scopes for admin consent

In my early post I explained about administrator consent (admin consent) in Azure AD v2 endpoint. The admin consent is very useful and needed for the various scenarios, such as app permissions (application-level privilege without interactive sign-in UI), granting entire employees without individual user consents, or on-behalf-of flow in your web api.

For the use of admin consent, you must pre-define the permissions in portal UI (App Registration Portal). But, in this UI, you can set the scopes only for Microsoft Graph (see the following screenshot), and some ISV folks ask me “how to use admin consent for legacy APIs (Outlook Rest API, etc), 3rd party apps, or your own custom apps ?”.

The answer is “edit manifest by your own !”

In this post I show you how to configure this custom settings in Azure AD v2 endpoint with application manifest step-by-step.

Permissions in Azure AD v2 endpoint – Under the hood

Now let’s take a look at predefined scopes in v2 endpoint.

For the sake of scenarios such as application permissions (application-level privilege without interactive sign-in UI), granting entire employees without individual user consents (delegated permissions) etc, you can pre-define the needed scopes to the application within App Registration Portal.

Note !  For the usual user consent, you don’t need to pre-define scopes in Azure AD v2 endpoint and you can set scopes dynamically (on the fly) when you authenticate.
This pre-defined permission is needed for admin consent.

These settings are all written in the application manifest, and you can edit the manifest directly with “Edit Application Manifest” button (see below) in App Registration Portal.

For example, when you set “offline_access” scope and “openid” scope in your app, these are written in the manifest as follows.

{
  "id": "678806b7-adee-4f4e-b548-947e10512e00",
  "appId": "c471620b-73a3-4a88-a926-eda184e6fde9",
  "name": "testapp01",
  "oauth2AllowImplicitFlow": false,
  "requiredResourceAccess": [
    {
      "resourceAppId": "00000003-0000-0000-c000-000000000000",
      "resourceAccess": [
        {
          "id": "7427e0e9-2fba-42fe-b0c0-848c9e6a8182",
          "type": "Scope"
        },
        {
          "id": "37f7f235-527c-4136-accd-4a02d197296e",
          "type": "Scope"
        }
      ]
    }
  ],
  ...
  
}

As you can see, the app’s ID and scope’s ID is used for settings. For example, 00000003-0000-0000-c000-000000000000 means “Microsoft Graph” application and 7427e0e9-2fba-42fe-b0c0-848c9e6a8182 means “offline_access” scope.
The following is the list of IDs for the familiar scopes. (All scopes are in “Microsoft Graph” application.)

openid 37f7f235-527c-4136-accd-4a02d197296e
email 64a6cdd6-aab1-4aaf-94b8-3cc8405e90d0
profile 37f7f235-527c-4136-accd-4a02d197296e
offline_access 7427e0e9-2fba-42fe-b0c0-848c9e6a8182

In conclusion, you just edit the manifest by your own, when you need to add custom scopes except for Microsoft Graph application.
The next concern is : How to get (look for) the app’s id and scope’s id ?

Use permissions for other APIs

When you want to use Outlook REST API, OneDrive API, Azure AD Graph API, Power BI REST API, etc, etc …, first you should go to Azure Active Directory settings in Azure Portal.
In this settings, you can set these APIs for required permissions and you can see the app’s id and scope’s id in the manifest text as follows. (The following is the sample for https://outlook.office.com/mail.read scope.)

You can use same ID in v2 endpoint and can set these scopes using the manifest editor in App Registration Portal as follows.

{
  "id": "678806b7-adee-4f4e-b548-947e10512e00",
  "appId": "c471620b-73a3-4a88-a926-eda184e6fde9",
  "name": "testapp01",
  "oauth2AllowImplicitFlow": false,
  "requiredResourceAccess": [
    {
      "resourceAppId": "00000003-0000-0000-c000-000000000000",
      "resourceAccess": [
        {
          "id": "7427e0e9-2fba-42fe-b0c0-848c9e6a8182",
          "type": "Scope"
        },
        {
          "id": "37f7f235-527c-4136-accd-4a02d197296e",
          "type": "Scope"
        }
      ]
    },
    {
      "resourceAppId": "00000002-0000-0ff1-ce00-000000000000",
      "resourceAccess": [
        {
          "id": "185758ba-798d-4b72-9e54-429a413a2510",
          "type": "Scope"
        }
      ]
    }
  ],
  ...
  
}

Now let’s access the following admin consent url in v2 endpoint with your browser. (Change testdirectory.onmicrosoft.com, c471620b-73a3-4a88-a926-eda184e6fde9, and https://localhost/testapp01 for your appropriate values.)

https://login.microsoftonline.com/testdirectory.onmicrosoft.com/adminconsent?client_id=c471620b-73a3-4a88-a926-eda184e6fde9&state=12345&redirect_uri=https%3a%2f%2flocalhost%2ftestapp01

After you logged-in by the administrator account, the following consent screen is displayed. Note that this Mail.Read scope is not in Microsoft Graph, but in Outlook REST API. Therefore when you get your own e-mails in your inbox, you must access https://outlook.office365.com/api/{version}/me/messages instead of using https://graph.microsoft.com/{version}/me/messages. (You cannot use https://graph.microsoft.com/{version}/me/messages with this consent.)

Use permissions for your own custom applications

As I mentioned in my early post “Build your own Web API protected by Azure AD v2.0 endpoint with custom scopes“, you can define your own custom scopes for your api applications in App Registration Portal.

For example, we assume that 2 scopes in our api application are defined as the following screenshot in App Registration Portal.

These scopes will be written in the manifest as follows.
In this case, the app’s ID is c4894da7-0070-486e-af4c-e1c2ba5e24ae, and the scope’s IDs are 223e6396-1b01-4a16-bb2f-03eaed9f31a8 and 658e7fa5-bb32-4ed1-93eb-b040ebc6bfec.

{
  "id": "2cd5c8e0-6a6c-4e8d-870c-46c08f3be3d9",
  "appId": "c4894da7-0070-486e-af4c-e1c2ba5e24ae",
  "identifierUris": [
    "api://c4894da7-0070-486e-af4c-e1c2ba5e24ae"
  ],
  "oauth2Permissions": [
    {
      "adminConsentDescription": "Allow the client to write your data with myapi01 on behalf of the signed-in user.",
      "adminConsentDisplayName": "Write data with myapi",
      "id": "223e6396-1b01-4a16-bb2f-03eaed9f31a8",
      "isEnabled": true,
      "lang": null,
      "origin": "Application",
      "type": "User",
      "userConsentDescription": "Allow the client to write your data with myapi01 on your behalf.",
      "userConsentDisplayName": "Write data with myapi",
      "value": "write_as_user"
    },
    {
      "adminConsentDescription": "Allow the client to read your data with myapi01 on behalf of the signed-in user.",
      "adminConsentDisplayName": "Read data with myapi",
      "id": "658e7fa5-bb32-4ed1-93eb-b040ebc6bfec",
      "isEnabled": true,
      "lang": null,
      "origin": "Application",
      "type": "User",
      "userConsentDescription": "Allow the client to read your data with myapi01 on your behalf.",
      "userConsentDisplayName": "Read data with myapi",
      "value": "read_as_user"
    }
  ],
  ...
  
}

Therefore, on the client side, you can set these scopes of the api application using the manifest editor as follows.

{
  "id": "678806b7-adee-4f4e-b548-947e10512e00",
  "appId": "c471620b-73a3-4a88-a926-eda184e6fde9",
  "name": "testapp01",
  "oauth2AllowImplicitFlow": false,
  "requiredResourceAccess": [
    {
      "resourceAppId": "00000003-0000-0000-c000-000000000000",
      "resourceAccess": [
        {
          "id": "7427e0e9-2fba-42fe-b0c0-848c9e6a8182",
          "type": "Scope"
        },
        {
          "id": "37f7f235-527c-4136-accd-4a02d197296e",
          "type": "Scope"
        }
      ]
    },
    {
      "resourceAppId": "c4894da7-0070-486e-af4c-e1c2ba5e24ae",
      "resourceAccess": [
        {
          "id": "658e7fa5-bb32-4ed1-93eb-b040ebc6bfec",
          "type": "Scope"
        }
      ]
    }
  ],
  ...
  
}

First the user (administrator) must subscribe the api application before using this api’s permission in the client application.
Let’s access the following url, in which c4894da7-0070-486e-af4c-e1c2ba5e24ae is the app’s ID of the api application. (See my early post “Build your own Web API protected by Azure AD v2.0 endpoint with custom scopes” for details.)

https://login.microsoftonline.com/testdirectory.onmicrosoft.com/adminconsent?client_id=c4894da7-0070-486e-af4c-e1c2ba5e24ae&state=12345&redirect_uri=https%3a%2f%2flocalhost%2fmyapi01

After you’ve subscribed the api application, now let’s access the following admin consent url for the client application, in which c471620b-73a3-4a88-a926-eda184e6fde9 is the app’s ID of the client side.

https://login.microsoftonline.com/testdirectory.onmicrosoft.com/adminconsent?client_id=c471620b-73a3-4a88-a926-eda184e6fde9&state=12345&redirect_uri=https%3a%2f%2flocalhost%2ftestapp01

After you logged-in with the administrator account, you can find that the following consent screen is displayed.

Azure Batch AI – Walkthrough and How it works

In my previous post, I showed how to setup and configure for the training with multiple machines.
Using Azure Batch AI, you’ll find that it’s very straightforward and simplifying many tasks for AI workloads, especially when it’s distributed training workloads.

Azure Batch AI is built on top of Azure Batch (see my early post), and then you can easily understand this mechanism when you’re familiar with Azure Batch.
Now let’s see how Batch AI works and how you can use it !

Here we use Azure CLI (Command Line Interface), but it’s also integrated with tool’s UI like Azure Portal, Visual Studio, or Visual Studio Code.

Note : You can also use Azure Databricks (which is fully-managed service built on top of Spark) for distributed deep learning workloads in Azure.

Simplifies the distributed training

First, we assume that you are familiar with the basic concepts of distributed training with Cognitive Toolkit (CNTK). In this post we use the following distributed-training sample code (Train_MNIST.py) with training data (Train-28x28_cntk_text.txt), which is the exactly same sample code in my previous post. (See “Walkthrough – Distributed training with CNTK” for details.)
This sample trains the convolutional neural network (LeNet) with distributed manners and save the trained model (ConvNet_MNIST.dnn) after the training is done.

Train_MNIST.py

import numpy as np
import sys
import os
import cntk
from cntk.train.training_session import *

# folder which includes "Train-28x28_cntk_text.txt"
data_path = sys.argv[1]
# folder for model output
model_path = sys.argv[2]

# Creates and trains a feedforward classification model for MNIST images
def train_mnist():
  # variables denoting the features and label data
  input_var = cntk.input_variable((1, 28, 28))
  label_var = cntk.input_variable(10)

  # instantiate the feedforward classification model
  with cntk.layers.default_options(
    init = cntk.layers.glorot_uniform(),
    activation=cntk.ops.relu):
    h = input_var/255.0
    h = cntk.layers.Convolution2D((5,5), 32, pad=True)(h)
    h = cntk.layers.MaxPooling((3,3), (2,2))(h)
    h = cntk.layers.Convolution2D((3,3), 48)(h)
    h = cntk.layers.MaxPooling((3,3), (2,2))(h)
    h = cntk.layers.Convolution2D((3,3), 64)(h)
    h = cntk.layers.Dense(96)(h)
    h = cntk.layers.Dropout(0.5)(h)
    z = cntk.layers.Dense(10, activation=None)(h)

  # create source for training data
  reader_train = cntk.io.MinibatchSource(
    cntk.io.CTFDeserializer(
      os.path.join(data_path, 'Train-28x28_cntk_text.txt'),
      cntk.io.StreamDefs(
        features = cntk.io.StreamDef(field='features', shape=28 * 28),
        labels = cntk.io.StreamDef(field='labels', shape=10)
      )),
      randomize=True,
      max_samples = 60000,
      multithreaded_deserializer=True)

  # create trainer
  epoch_size = 60000
  minibatch_size = 64
  max_epochs = 40

  lr_schedule = cntk.learning_rate_schedule(
    0.2,
    cntk.UnitType.minibatch)
  learner = cntk.sgd(
    z.parameters,
    lr_schedule)
  dist_learner = cntk.train.distributed.data_parallel_distributed_learner(
    learner = learner,
    num_quantization_bits=32, # non-quantized SGD
    distributed_after=0) # all data is parallelized

  progress_printer = cntk.logging.ProgressPrinter(
    tag='Training',
    num_epochs=max_epochs,
    freq=100,
    log_to_file=None,
    rank=cntk.train.distributed.Communicator.rank(),
    gen_heartbeat=True,
    distributed_freq=None)
  ce = cntk.losses.cross_entropy_with_softmax(
    z,
    label_var)
  pe = cntk.metrics.classification_error(
    z,
    label_var)
  trainer = cntk.Trainer(
    z,
    (ce, pe),
    dist_learner,
    progress_printer)

  # define mapping from reader streams to network inputs
  input_map = {
    input_var : reader_train.streams.features,
    label_var : reader_train.streams.labels
  }

  cntk.logging.log_number_of_parameters(z) ; print()

  session = training_session(
    trainer=trainer,
    mb_source = reader_train,
    model_inputs_to_streams = input_map,
    mb_size = minibatch_size,
    progress_frequency=epoch_size,
    checkpoint_config = CheckpointConfig(
      frequency = epoch_size,
      filename = os.path.join(
        model_path, "ConvNet_MNIST"),
      restore = False),
    test_config = None
  )
  session.train()

  # save model only by one process
  if 0 == cntk.Communicator.rank():
    z.save(
      os.path.join(
        model_path,
        "ConvNet_MNIST.dnn"))

  # clean-up distribution
  cntk.train.distributed.Communicator.finalize();

  return

if __name__=='__main__':
  train_mnist()

Train-28x28_cntk_text.txt

|labels 0 0 0 0 0 0 0 1 0 0 |features 0 3 18 18 126... (784 pixel data)
|labels 0 1 0 0 0 0 0 0 0 0 |features 0 0 139 0 253... (784 pixel data)
...

Azure Batch AI uses shared script and data (both input data and model output) for the distributed training. Therefore it uses the shared storage like Azure blob, Azure file share, or NFS file servers as following illustrated.
In this post we use Azure Blob Storage and we assume that the storage account name is “batchaistore01” and the container name is “share01”.

Before starting, locate the previous Train_MNIST.py (script) and Train-28x28_cntk_text.txt (input data) in the blob container.

Now let’s create the cluster (multiple computing nodes) with the following CLI command.
Here we’re creating 2 nodes of Data Science Virtual Machines (DSVM), in which the required software is all pre-configured including GPU-accelerated drivers, OpenMPI and deep learning libraries (TensorFlow, Cognitive Toolkit, Caffe, Keras, etc) with GPU utilized.
Instead of using DSVM, you can also use simple Ubuntu 16.04 LTS (use “UbuntuLTS” instead of “UbuntuDSVM” in the following command) and create a job with the container image, in which the required software is configured. (Later I’ll show you how to use the container image in the job.)

Note : When you don’t need the distributed training (you only use one single node for the training), you can just specify “--min 1” and “--max 1” in the following command.

Note : Currently (in preview) the supported regions are EastUS, WestUS2, WestEurope and available VM Size (pricing tier) is D-series, E-series, F-series, NC-series, ND-series.

# create cluster (computing nodes)
az batchai cluster create --name cluster01 \
  --resource-group testrg01 \
  --location eastus \
  --vm-size STANDARD_NC6 \
  --image UbuntuDSVM \
  --min 2 --max 2 \
  --storage-account-name batchaistore01 \
  --storage-account-key ciUzonyM... \
  --container-name share01 \
  --user-name tsmatsuz --password P@ssw0rd

Please remember my previous post, in which we manually configured the computing node.
Using Batch AI, the only one command runs all configuration tasks including MPI and inter-node’s communication settings. It significantly simplifies the distributed training !

After you’ve created the cluster, please wait until “allocationState” is steady without any errors using “az batchai cluster show” command. (See the following results.)

# show status for cluster creation
az batchai cluster show \
  --name cluster01 \
  --resource-group testrg01


{
  "allocationState": "steady",
  "creationTime": "2017-12-18T07:01:00.424000+00:00",
  "currentNodeCount": 1,
  "errors": null,
  "location": "EastUS",
  "name": "cluster01",
  "nodeSetup": {
    "mountVolumes": {
      "azureBlobFileSystems": null,
      "azureFileShares": [
        {
          "accountName": "batchaistore01",
          "azureFileUrl": "https://batchaistore01.file.core.windows.net/share01",
          "credentials": {
            "accountKey": null,
            "accountKeySecretReference": null
          },
          "directoryMode": "0777",
          "fileMode": "0777",
          "relativeMountPath": "dir01"
        }
      ],
      "fileServers": null,
      "unmanagedFileSystems": null
    },
    "setupTask": null
  },
  ...
}

You can get the access information for the computing nodes with the following command.
As you can see, here’s 2 computing nodes : one is accessed by ssh with port 50000 and another with port 50001. (The inbound NAT is used for accessing nodes with public addresses.)

# Show endpoints
az batchai cluster list-nodes \
  --name cluster01 \
  --resource-group testrg01


[
  {
    "ipAddress": "52.168.68.25",
    "nodeId": "tvm-4283973576_1-20171218t070031z",
    "port": 50000.0
  },
  {
    "ipAddress": "52.168.68.25",
    "nodeId": "tvm-4283973576_2-20171218t070031z",
    "port": 50001.0
  }
]

Please login to the computing node with ssh client, and you can find that the storage container “share01” is mounted as path “$AZ_BATCHAI_MOUNT_ROOT/bfs” (/mnt/batch/tasks/shared/LS_root/mounts/bfs in my environment) in your computing node. (You can change the mount point “bfs” with the cluster creation option.)

When you set the same value for minimum (min) and maximum (max) as cluster nodes’ count with cluster creation option, the auto-scale setting is disabled and it uses the fixed nodes for the cluster.
When you set the different values, the auto-scale setting is enabled automatically. (See the following result.)
You can check whether the auto-scaling is enabled with “az batchai cluster show” command.

Note : When using the fixed nodes, you can change the node count with “az batchai cluster resize“. When using auto-scaling, you can change the auto-scale configuration with “az batchai cluster auto-scale“.

# creating fixed nodes...
az batchai cluster create --name cluster01 \
  --resource-group testrg01 \
  --location eastus \
  --min 2 --max 2 ...


{
  "allocationState": "steady",
  "scaleSettings": {
    "autoScale": null,
    "manual": {
      "nodeDeallocationOption": "requeue",
      "targetNodeCount": 2
    }
  },
  ...
}
# creating auto-scaling nodes...
az batchai cluster create --name cluster01 \
  --resource-group testrg01 \
  --location eastus \
  --min 1 --max 3 ...


{
  "allocationState": "steady",
  "scaleSettings": {
    "autoScale": {
      "initialNodeCount": 0,
      "maximumNodeCount": 3,
      "minimumNodeCount": 1
    },
    "manual": null
  },
  ...
}

When your cluster is ready, now you can publish the training job !

Before starting your job, you must first create the following json for the job definition.
With this definition, the command “python Train_MNIST.py {dir for training data} {dir for model output}” will be launched on MPI (on “mpirun” command). Therefore the training workloads will run on all nodes in parallel and the results are exchanged each other.

Note : When you set 2 as “nodeCount” as follows, 2 computing nodes are set in the host file for MPI (located as $AZ_BATCHAI_MPI_HOST_FILE) and the file is used by “mpirun” command.

Note : Here we use the settings for CNTK (Cognitive Toolkit), but you can also specify the framework settings for TensorFlow, Caffe (incl. Caffe2) and Chainer with Batch AI.
Later I’ll explain more about job definition file…

job.json

{
  "properties": {
    "nodeCount": 2,
    "cntkSettings": {
      "pythonScriptFilePath": "$AZ_BATCHAI_MOUNT_ROOT/bfs/Train_MNIST.py",
      "commandLineArgs": "$AZ_BATCHAI_MOUNT_ROOT/bfs $AZ_BATCHAI_MOUNT_ROOT/bfs/model"
    },
    "stdOutErrPathPrefix": "$AZ_BATCHAI_MOUNT_ROOT/bfs/output"
  }
}

You can run this job with the following command.
As I mentioned in my early post (see my post “Azure Batch – Walkthrough and How it works“), the job runs under an auto-user account named “_azbatch”, and not used the provisioned user account (“tsmatsuz” in my sample).

# run job !
az batchai job create --name job01 \
  --cluster-name cluster01 \
  --resource-group testrg01 \
  --location eastus \
  --config job.json

After you published the job, you can see the job processing state (queued, running, succeeded, etc) with the following command.

# see the job status
az batchai job list -o table


Name    Resource Group    Cluster    Cluster RG    Tool      Nodes  State        Exit code
------  ----------------  ---------  ------------  ------  -------  ---------  -----------
job01   testrg01          cluster01  testrg01      cntk          2  succeeded            0

The result (the generated model and console output) is located in the shared (mounted) folder and you can download these files. (As I’ll explain later, you can also get the download url with CLI command.)

When you want to terminate the queued job, you can run the following command.

# terminate job
az batchai job terminate \
  --name job01 \
  --resource-group testrg01

 

More about job definitions

Here I show you more about job definitions (job.json).

When you use your own favorite framework like Keras, MXNet, etc except for BatchAI-supported frameworks, you can use customToolkitSettings in job definitions.
For example, you can define the previous CNTK distributed training using “customToolkitSettings” as follows. (The result is the same as before.)

Using custom settings (job.json)

{
  "properties": {
    "nodeCount": 2,
    "customToolkitSettings": {
      "commandLine": "mpirun --mca btl_tcp_if_include eth0 --hostfile $AZ_BATCHAI_MPI_HOST_FILE python $AZ_BATCHAI_MOUNT_ROOT/bfs/Train_MNIST.py $AZ_BATCHAI_MOUNT_ROOT/bfs $AZ_BATCHAI_MOUNT_ROOT/bfs/model"
    },    
    "stdOutErrPathPrefix": "$AZ_BATCHAI_MOUNT_ROOT/bfs/output"
  }
}

In my previous example, I configured command with only “$AZ_BATCHAI_MOUNT_ROOT” in job definition (job.json). But you can also write the definition as follows with good modularity. (Here we’re defining extra variables “AZ_BATCHAI_INPUT_MYSCRIPT”, “AZ_BATCHAI_INPUT_MYDATA”, and “AZ_BATCHAI_OUTPUT_MYMODEL”.)

With inputDirectories and outputDirectories (job.json)

{
  "properties": {
    "nodeCount": 2,
    "inputDirectories": [
      {
        "id": "MYSCRIPT",
        "path": "$AZ_BATCHAI_MOUNT_ROOT/bfs"
      },
      {
        "id": "MYDATA",
        "path": "$AZ_BATCHAI_MOUNT_ROOT/bfs"
      }
    ],
    "outputDirectories": [
      {
        "id": "MYMODEL",
        "pathPrefix": "$AZ_BATCHAI_MOUNT_ROOT/bfs",
        "pathSuffix": "model"
      }
    ],
    "cntkSettings": {
      "pythonScriptFilePath": "$AZ_BATCHAI_INPUT_MYSCRIPT/Train_MNIST.py",
      "commandLineArgs": "$AZ_BATCHAI_INPUT_MYDATA $AZ_BATCHAI_OUTPUT_MYMODEL"
    },
    "stdOutErrPathPrefix": "$AZ_BATCHAI_MOUNT_ROOT/bfs/output"
  }
}

The benefit of this writing is not only for modularity.
When you write with “outputDirectories” as above, you can get the details about output files with this id of “outputDirectories” as follows. Here are 3 files (ConvNet_MNIST, ConvNet_MNIST.ckp, ConvNet_MNIST.dnn) in the directory AZ_BATCHAI_OUTPUT_MYMODEL.
(When you access job stdout and stderr, use “stdouterr” for directory id.)

# list files in "MYMODEL"
az batchai job list-files \
  --name job03 \
  --output-directory-id MYMODEL \
  --resource-group testrg01

[
  {
    "contentLength": 415115,
    "downloadUrl": "https://batchaistore01.blob.core.windows.net/share01/b3a...",
    "name": ".../outputs/model/ConvNet_MNIST"
  },
  {
    "contentLength": 1127,
    "downloadUrl": "https://batchaistore01.blob.core.windows.net/share01/b3a...",
    "name": ".../outputs/model/ConvNet_MNIST.ckp"
  },
  {
    "contentLength": 409549,
    "downloadUrl": "https://batchaistore01.blob.core.windows.net/share01/b3a...",
    "name": ".../outputs/model/ConvNet_MNIST.dnn"
  }
]

You can also view the results with integrated UI in Azure Portal or AI tools, etc.

Azure Portal

AI tools for Visual Studio

In my previous example, we used only built-in modules in python.
If you want to install some additional module, please use the job preparation as follows.

Invoking pre-task (job.json)

{
  "properties": {
    "nodeCount": 1,
    "jobPreparation": {
      "commandLine": "pip install mpi4py"
    },
    ...
  }
}

You can also specify commands using sh file as follows.

Invoking pre-task (job.json)

{
  "properties": {
    "nodeCount": 1,
    "jobPreparation": {
      "commandLine": "bash $AZ_BATCHAI_MOUNT_ROOT/myprepare.sh"
    },
    ...
  }
}

As I mentioned earlier, you can use the docker image without DSVM provisioning.
With the following definition, you can use simple Ubuntu VM for cluster creation and use docker image for provisioning.

With docker image (job.json)

{
  "properties": {
    "nodeCount": 2,
    "cntkSettings": {
      "pythonScriptFilePath": "$AZ_BATCHAI_MOUNT_ROOT/bfs/Train_MNIST.py",
      "commandLineArgs": "$AZ_BATCHAI_MOUNT_ROOT/bfs $AZ_BATCHAI_MOUNT_ROOT/bfs/model"
    },
    "stdOutErrPathPrefix": "$AZ_BATCHAI_MOUNT_ROOT/bfs/output",
    "containerSettings": {
      "imageSourceRegistry": {
        "image": "microsoft/cntk:2.1-gpu-python3.5-cuda8.0-cudnn6.0"
      }
    }
  }
}

 

Next I show you Azure Distributed Data Engineering Toolkit (aztk), which is also built on top of Azure Batch and provides Spark cluster with docker images.
Enjoy your AI works with Azure !

 

[Reference]

Github – Azure Batch AI recipes
https://github.com/Azure/BatchAI/tree/master/recipes

Walkthrough – Distributed training with CNTK (Cognitive Toolkit)

The advantage of CNTK is not only performance, but it can also support multiple devices (GPUs) or multiple machines with the flexible way by built-in capabilities and rapidly scale along with the number of processes.

In my old post I showed the idea of distributed training with MXNET and R samples (see “Accelerate Mxnet R trainig by GPUs and multiple machines“). Here I show you the same technique with Cognitive Toolkit (CNTK).
In this post we use python, but you can also use with CNTK R bindings.

Setting-up your machines

Before staring, you must provision and configure your computing machines (nodes) as follows. If you run the distributed training with multiple machines, all machines should be configured as same.

  • Configure the network for inter-nodes’ communication.
  • When using GPUs, install (compile) and configure the drivers and related libraries (cuda, cudnn, etc).
  • Install and setup OpenMPI on multiple machines.
  • Put the following program (.py) and data on all multiple machines. It’s better to use NFS shared (mounted) directory and place all files on this directory.

Moreover you must setup the trust between host machines.
Here we create the key pair on my machine using the following command. (The key pair “id_rsa” and “id_rsa.pub” are generated in .ssh folder. During creation, we set blank (null) to the certificate password.)
After that, you copy the generated public key (id_rsa.pub) into {home of the same user id}/.ssh directory on other remote machines. The file name must be changed to “authorized_keys”.

ssh-keygen -t rsa

ls -al .ssh

drwx------ 2 tsmatsuz tsmatsuz 4096 Feb 21 05:01 .
drwxr-xr-x 7 tsmatsuz tsmatsuz 4096 Feb 21 04:52 ..
-rw------- 1 tsmatsuz tsmatsuz 1766 Feb 21 05:01 id_rsa
-rw-r--r-- 1 tsmatsuz tsmatsuz  403 Feb 21 05:01 id_rsa.pub

Let’s confirm that you can pass the command (pwd) to the remote hosts with the following command without any prompt. If succeeded, the current working directory on the remote host will be returned. (Here we assume that 10.0.0.5 is the ip address of the remote host.)

ssh -o StrictHostKeyChecking=no 10.0.0.5 -p 22 pwd

/home/tsmatsuz

The easiest way for provisioning is to use Data Science Virtual Machine (DSVM) in Azure computing, or use docker image (microsoft/cntk:2.1-gpu-python3.5-cuda8.0-cudnn6.0, etc) with docker clusters. Using DSVM, you don’t need to setup all the dependent libraries and there are all automatically pre-configured including TensorFlow, CNTK, MXNet, etc with GPU utilized. The only thing you have to do is to configure the inter-node’s communications.
You can also use the InfiniBand network for inter-communication with Azure infrastructure.

Note : In my next post I’ll show you the most easiest way for provisioning with Azure Batch AI. Using Batch AI, you can provision your computing nodes including inter-node communication’s configuration with only one command !

Our Sample – Single process

Now let’s start programming. Here we use the brief MNIST (hand-writing image’s) sample.

First you can download the sample data (Train-28x28_cntk_text.txt and Test-28x28_cntk_text.txt) from Batch AI sample (BatchAIQuickStart.zip). These data are CNTK text format (CTF) with the probability (0 – 1) as “labels” and 28 x 28 (=784) gray-scale pixel data (0-255) as “features” as follows.

Train-28x28_cntk_text.txt, Test-28x28_cntk_text.txt

|labels 0 0 0 0 0 0 0 1 0 0 |features 0 3 18 18 126...
|labels 0 1 0 0 0 0 0 0 0 0 |features 0 0 139 0 253...
...

Here we start with the following brief training code with convolutional neural network (LeNet). As you can see, this code save the trained model after the training is done.

Train_MNIST.py

import numpy as np
import sys
import os
import cntk

# folder which includes "Train-28x28_cntk_text.txt"
data_path = sys.argv[1]
# folder for model output
model_path = sys.argv[2]

def train_mnist():
  # variables denoting the features and label data
  input_var = cntk.input_variable((1, 28, 28))
  label_var = cntk.input_variable(10)

  # instantiate the feedforward classification model
  with cntk.layers.default_options(
    init = cntk.layers.glorot_uniform(),
    activation=cntk.ops.relu):
    h = input_var/255.0
    h = cntk.layers.Convolution2D((5,5), 32, pad=True)(h)
    h = cntk.layers.MaxPooling((3,3), (2,2))(h)
    h = cntk.layers.Convolution2D((3,3), 48)(h)
    h = cntk.layers.MaxPooling((3,3), (2,2))(h)
    h = cntk.layers.Convolution2D((3,3), 64)(h)
    h = cntk.layers.Dense(96)(h)
    h = cntk.layers.Dropout(0.5)(h)
    z = cntk.layers.Dense(10, activation=None)(h)

  # create source for training data
  reader_train = cntk.io.MinibatchSource(
    cntk.io.CTFDeserializer(
      os.path.join(data_path, 'Train-28x28_cntk_text.txt'),
      cntk.io.StreamDefs(
        features = cntk.io.StreamDef(field='features', shape=28 * 28),
        labels = cntk.io.StreamDef(field='labels', shape=10)
      )),
      randomize=True,
      max_sweeps = cntk.io.INFINITELY_REPEAT)  

  # create trainer
  epoch_size = 60000
  minibatch_size = 64
  max_epochs = 40

  lr_schedule = cntk.learning_rate_schedule(
    0.2,
    cntk.UnitType.minibatch)
  learner = cntk.sgd(
    z.parameters,
    lr_schedule)

  progress_printer = cntk.logging.ProgressPrinter(
    tag='Training',
    num_epochs=max_epochs)
  ce = cntk.losses.cross_entropy_with_softmax(
    z,
    label_var)
  pe = cntk.metrics.classification_error(
    z,
    label_var)
  trainer = cntk.Trainer(
    z,
    (ce, pe),
    learner,
    progress_printer)

  # define mapping from reader streams to network inputs
  input_map = {
    input_var : reader_train.streams.features,
    label_var : reader_train.streams.labels
  }

  cntk.logging.log_number_of_parameters(z) ; print()

  # Train with minibatches
  for epoch in range(max_epochs):
    sample_count = 0
    while sample_count &lt; epoch_size:
      # fetch minibatch
      data = reader_train.next_minibatch(
        min(minibatch_size, epoch_size - sample_count),
        input_map=input_map)
      # update model with it
      trainer.train_minibatch(data)
      # count samples processed so far
      sample_count += data[label_var].num_samples

    # print result
    trainer.summarize_training_progress()

  # save model
  z.save(
    os.path.join(
      model_path,
      "ConvNet_MNIST.dnn".format(epoch)))

  return

if __name__=='__main__':
  train_mnist()

After model is created, you can load the generated model and predict (evaluate) the values from the hand-writing images as follows.

Test_MNIST.py

import numpy as np
import sys
import os
import cntk
from cntk.ops.functions import load_model

# folder which includes "Test-28x28_cntk_text.txt"
data_path = sys.argv[1]
# folder for model data
model_path = sys.argv[2]

def test_mnist():
  # variables denoting the features and label data
  input_var = cntk.input_variable((1, 28, 28))
  label_var = cntk.input_variable(10)

  # load model
  z = load_model(os.path.join(
    model_path,
    "ConvNet_MNIST.dnn"))
  out = cntk.softmax(z)

  # load test data
  reader_test = cntk.io.MinibatchSource(
    cntk.io.CTFDeserializer(
      os.path.join(data_path, 'Test-28x28_cntk_text.txt'),
      cntk.io.StreamDefs(
        features  = cntk.io.StreamDef(field='features', shape=28 * 28),
        labels = cntk.io.StreamDef(field='labels', shape= 10)
      )),
      randomize=False,
      max_sweeps = 1)

  # fetch first 25 rows as min batch.
  test_size = 25
  data = reader_test.next_minibatch(
    test_size,
    input_map={
      input_var : reader_test.streams.features,
      label_var : reader_test.streams.labels
    })
  data_label = data[label_var].asarray()
  data_input = data[input_var].asarray()
  data_input = np.reshape(
    data_input,
    (test_size, 1, 28, 28))

  # predict
  predicted_result = [out.eval(data_input[i]) for i in range(len(data_input))]

  # find the index with the maximum probability
  pred = [np.argmax(predicted_result[i]) for i in range(len(predicted_result))]

  # actual label
  actual = [np.argmax(data_label[i]) for i in range(len(data_label))]

  # print result !
  print("Label:", actual[:25])
  print("Predicted:", pred)

  return

if __name__=='__main__':
  test_mnist()

This code can run with only one single process. When you run this code, you can get the array of predicted values and actual (true) values as follows. (See the output result.)

# train and save the model
python Train_MNIST.py /home/tsmatsuz/work /home/tsmatsuz/work/model
# evaluate (predict) with generated model
python Test_MNIST.py /home/tsmatsuz/work /home/tsmatsuz/work/model

Modify our code for distributed training

With CNTK’s built-in functions, you can easily change your program for the distributed training.

Here is our code for the distributed training version. (The highlighted line is the modified code.)
As you can see, you can use the built-in learner for distributed training (data_parallel_distributed_learner) instead of the local learner. Moreover, instead of using train_minibatch() method, you must define the training session (session = training_session(...)) and start this session with session.train().

Train_MNIST.py (Distributed)

# modified for the distributed training
# (see highlighted)

import numpy as np
import sys
import os
import cntk
from cntk.train.training_session import *

# folder which includes "Train-28x28_cntk_text.txt"
data_path = sys.argv[1]
# folder for model output
model_path = sys.argv[2]

# Creates and trains a feedforward classification model for MNIST images
def train_mnist():
  # variables denoting the features and label data
  input_var = cntk.input_variable((1, 28, 28))
  label_var = cntk.input_variable(10)

  # instantiate the feedforward classification model
  with cntk.layers.default_options(
    init = cntk.layers.glorot_uniform(),
    activation=cntk.ops.relu):
    h = input_var/255.0
    h = cntk.layers.Convolution2D((5,5), 32, pad=True)(h)
    h = cntk.layers.MaxPooling((3,3), (2,2))(h)
    h = cntk.layers.Convolution2D((3,3), 48)(h)
    h = cntk.layers.MaxPooling((3,3), (2,2))(h)
    h = cntk.layers.Convolution2D((3,3), 64)(h)
    h = cntk.layers.Dense(96)(h)
    h = cntk.layers.Dropout(0.5)(h)
    z = cntk.layers.Dense(10, activation=None)(h)

  # create source for training data
  reader_train = cntk.io.MinibatchSource(
    cntk.io.CTFDeserializer(
      os.path.join(data_path, 'Train-28x28_cntk_text.txt'),
      cntk.io.StreamDefs(
        features = cntk.io.StreamDef(field='features', shape=28 * 28),
        labels = cntk.io.StreamDef(field='labels', shape=10)
      )),
      randomize=True,
      max_samples = 60000,
      multithreaded_deserializer=True)

  # create trainer
  epoch_size = 60000
  minibatch_size = 64
  max_epochs = 40

  lr_schedule = cntk.learning_rate_schedule(
    0.2,
    cntk.UnitType.minibatch)
  learner = cntk.sgd(
    z.parameters,
    lr_schedule)
  dist_learner = cntk.train.distributed.data_parallel_distributed_learner(
    learner = learner,
    num_quantization_bits=32, # non-quantized SGD
    distributed_after=0) # all data is parallelized

  progress_printer = cntk.logging.ProgressPrinter(
    tag='Training',
    num_epochs=max_epochs,
    freq=100,
    log_to_file=None,
    rank=cntk.train.distributed.Communicator.rank(),
    gen_heartbeat=True,
    distributed_freq=None)
  ce = cntk.losses.cross_entropy_with_softmax(
    z,
    label_var)
  pe = cntk.metrics.classification_error(
    z,
    label_var)
  trainer = cntk.Trainer(
    z,
    (ce, pe),
    dist_learner,
    progress_printer)

  # define mapping from reader streams to network inputs
  input_map = {
    input_var : reader_train.streams.features,
    label_var : reader_train.streams.labels
  }

  cntk.logging.log_number_of_parameters(z) ; print()

  session = training_session(
    trainer=trainer,
    mb_source = reader_train,
    model_inputs_to_streams = input_map,
    mb_size = minibatch_size,
    progress_frequency=epoch_size,
    checkpoint_config = CheckpointConfig(
      frequency = epoch_size,
      filename = os.path.join(
        model_path, "ConvNet_MNIST"),
      restore = False),
    test_config = None
  )
  session.train()

  # save model only by one process
  if 0 == cntk.Communicator.rank():
    z.save(
      os.path.join(
        model_path,
        "ConvNet_MNIST.dnn"))

  # clean-up distribution
  cntk.train.distributed.Communicator.finalize();

  return

if __name__=='__main__':
  train_mnist()

CNTK supports the several types of parallel (distributed training) algorithms, and here we’re using data-parallel training (data_parallel_distributed_learner), which distributes the workloads along with each mini batch data and aggregates (exchanges) the results from multiple processes. (Here we distribute only data and not using the quantized gradient methodology.)
See “CNTK – Multiple GPUs and Machines” for the details about parallel algorithms.

For example, when you run the following command in your computing host, 2 worker processes run in your computer and exchange the results each other.
After the training is done, only one process (rank=0) saves the trained model (ConvNet_MNIST.dnn) in the model folder.
Then you can evaluate (predict) using the generated model same as the above. (See Test_MNIST.py for prediction.)

mpiexec --npernode 2 python Train_MNIST.py /home/tsmatsuz/work /home/tsmatsuz/work/model

When you run on multiple computing machines (nodes), first you create the following hosts file in your working node. This file lists the computing nodes, on which the training workloads will be started by MPI. (Here we use only 2 nodes for distributed training.)

hosts

10.0.0.4 slots=1 # 1 worker on node0
10.0.0.5 slots=1 # 1 worker on node1

Next you locate the program (Train_MNIST.py) and data (Train-28x28_cntk_text.txt) in the shared (mounted) folder, and make sure that every nodes can access both program and data.

Finally, run the following command in your working node. (“/mnt/test” is the shared folder and “eth0” is the network interface name on my working node.)
Before training, CNTK will seek for the infiniband network and it’s automatically used if available. Then the training workloads will run on all nodes in parallel and the results are exchanged.

Note: You can suppress the lookup for the infiniband network with “-mca btl ^openib” option.

mpiexec --mca btl_tcp_if_include eth0 \
  --hostfile hosts \
  python /mnt/test/Train_MNIST.py /mnt/test /mnt/test/model

 

Next time I’ll show you about Azure Batch AI. You will find how easy it’s for the distributed training workloads ! (This post is the preparation for the next lesson…)

 

[Reference]

Cognitive Toolkit – Multiple GPUs and Machines
https://docs.microsoft.com/en-us/cognitive-toolkit/Multiple-GPUs-and-machines

Azure Batch – Walkthrough and How it works

Batch is another side of the benefits of massive cloud computing. With this post and next, I show you how to use and how it works about Azure Batch and Batch AI, and first here I focus on Azure Batch fundamentals for your essential understanding.
(Note added 01/30/2018 : I added another post about Batch AI.)

When you use cloud computing resource for the purpose of critical batch execution, you should consider all the work yourself, such as constructing infrastructure (Virtual Machines, Virtual Networks, etc with ARM template), provisioning libraries and applications, providing queue, scaling, security, monitoring, collecting results, or clean-up, etc etc. Using Azure Batch, it helps you to do these batch provision or execution workloads with cloud massive computing scale.

In this post, we mainly use Azure CLI for your easy tracing and checking. But you must remember that you can also use UI (Azure Portal), PowerShell, REST API or language’s SDKs such as .NET (C#), Java, Python, and R (doAzureParallel and rAzureBatch package).

Before starting …

Before starting, select “New” – “Batch Service” in Azure Portal and create Batch Account. (Only for creating batch account we use Azure Portal, but you can create batch account also with Azure CLI.)

First you log-in to Azure batch account with Azure CLI as follows. In this post we’re using batch account named “test01” in the resource group “testrg01”.
Here we’re using interactive login UI, but you can also login silently with service principal and password (see my old post “How to use Azure REST API with Certificate programmatically (without interactive UI)“).

# you login Azure with interactive UI
az login
# you login batch account
az batch account login \
 --resource-group testrg01 \
 --name test01

Preparing batch pool (computing nodes)

Before executing batch, you must prepare the application package and batch pool.
The application package is the archived executables (and related other files) with compressed zip format. Now you compress executable applications (.exe, .dll, etc) into zip file, and upload this zipped file (myapp-exe.zip) as the batch application package with the following command.

# Upload and register your archive as application package
az batch application package create \
  --resource-group testrg01 \
  --name test01 \
  --application-id app01 \
  --package-file myapp-exe.zip \
  --version 1.0
# Set this version of package as default version
az batch application set \
  --resource-group testrg01 \
  --name test01 \
  --application-id app01 \
  --default-version 1.0

Next you must provision the batch pool with application packages as follows.
The batch pool is the computing nodes in Azure. Here we’re creating 3 nodes with Windows image (Publisher “MicrosoftWindowsServer”, Offer “WindowsServer”, Sku “2016-Datacenter”, Node agent SKU ID “batch.node.windows amd64”), but of course, you can use Linux computing nodes for the batch pool.

As you can see below, here we are using virtual machine with Standard A1, but you can also use NC-series (which has Tesla GPUs) for machine learning workloads. Moreover you can use Data Science Virtual Machine (DSVM) for Azure Batch, which is the pre-configured machine (including Python, R, CNTK, Tensorflow, MXNet, etc with GPU utilized) for machine learning and deep learning.
Please see “Azure Batch – List of virtual machine images” for the supported machine images.

az batch pool create \
 --id pool01 \
 --target-dedicated 3 \
 --image MicrosoftWindowsServer:WindowsServer:2016-Datacenter:latest \
 --node-agent-sku-id "batch.node.windows amd64" \
 --vm-size Standard_A1 \
 --application-package-references app01#1.0

On batch pool creation, the packaged application is extracted in the directory “D:\batch\tasks\apppackages\{folder name derived from your package name}” (/mnt/batch/tasks/apppackages/{…} in Linux), and this path string is referred by the environment variable %AZ_BATCH_APP_PACKAGE_app01#1.0% (“app01” is your app package name and “1.0” is the version) on your batch process.
For the details about environment variables, please refer “Azure Batch compute node environment variables“.

When some set-up is needed in each nodes, you can use the start-task setting.
For example, when you need your package installation instead of extracting zip archive, you can specify the following installation task (see --start-task-command-line option) on pool’s creation. This setting copies https://mystorage.blob.core.windows.net/fol01/myapp.msi as myapp.msi in current working directory and run msiexec with silent installation process.

Note : The error or output is written in the file (stderr.txt or stdout.txt) on %AZ_BATCH_NODE_STARTUP_DIR% directory (D:\batch\tasks\startup\ in Windows, /mnt/batch/tasks/startup in Linux) on each computing nodes.

az batch pool create \
 --id pool08 \
 --target-dedicated 3 \
 --image MicrosoftWindowsServer:WindowsServer:2016-Datacenter:latest \
 --node-agent-sku-id "batch.node.windows amd64" \
 --vm-size Standard_A1 \
 --application-package-references app08#1.0 \
 --start-task-command-line "cmd /c msiexec /i myapp.msi /quiet" \
 --start-task-resource-files "myapp.msi=https://mystorage.blob.core.windows.net/fol01/myapp.msi" \
 --start-task-wait-for-success

When you want to setup distributed training environments with some deep learning libraries (CNTK etc), you should setup MPI with admin elevated installation. But unfortunately there’s no command options in Azure Bath CLI for admin elevated execution.
In such a case, you can specify the detailed parameters with json format as follows. This json is the same format as batch rest api or ARM template. (See the reference for details.)

Note : See “Cognitive Toolkit : Multiple GPUs and Machines” for distributed training with CNTK. For distributed trainig with MXNetR, see my old post “Accelerate MXNet R training by multiple machines and GPUs“.
For the distributed training, some kind of mechanism for inter-node communication must be needed.

command

# Install MPI in start-task with admin elevated privileges
# (see the following "pooldef.json")
az batch pool create --json-file pooldef.json

pooldef.json (parameter’s definition)

{
  "id": "pool01",
  "virtualMachineConfiguration": {
    "imageReference": {
      "publisher": "MicrosoftWindowsServer",
      "offer": "WindowsServer",
      "sku": "2016-Datacenter",
      "version": "latest"
    },
    "nodeAgentSKUId": "batch.node.windows amd64"
  },
  "vmSize": "Standard_A1",
  "targetDedicatedNodes": "3",
  "enableInterNodeCommunication": true,
  "maxTasksPerNode": 1,
  "applicationPackageReferences": [
    {
      "applicationId": "app01",
      "version": "1.0"
    }
  ],
  "startTask": {
    "commandLine": "cmd /c MSMpiSetup.exe -unattend -force",
    "resourceFiles": [
      {
        "blobSource": "https://mystorage.blob.core.windows.net/fol01/MSMpiSetup.exe",
        "filePath": "MSMpiSetup.exe"
      }
    ],
    "userIdentity": {
      "autoUser": {
        "elevationLevel": "admin"
      }
    },
    "waitForSuccess": true
  }
}

When you want to deploy with pre-configured images, you can use custom VM images or docker images (containerized machine image) with batch pool.

Note : There’re two ways to use docker container for Azure Batch.
The first approach is to use start task for setting up docker on host machine (and run your batch task commands with “docker run” command).
The second approach is to create pool with machine image running docker and containerized image setting with containerConfiguration parameter. When you use the second approach, you can run batch task commands in docker containers, not directly on virtual machine. See “Run container applications on Azure Batch” for details.

When pool’s status gets “steady” and the each computing nodes’ state gets “idle” without any errors, it’s ready for your job execution !

# show pool's status
az batch pool show --pool-id pool01
{
  "id": "pool01",
  "allocationState": "steady",
  "applicationPackageReferences": [
    {
      "applicationId": "app07",
      "version": "1.0"
    }
  ],
  "currentDedicatedNodes": 3,
  "currentLowPriorityNodes": 0,
  "enableAutoScale": false,
  "enableInterNodeCommunication": false,
  "state": "active",
  ...
}
# show node's state
az batch node list --pool-id pool01
[
  {
    "id": "tvm-1219235766_1-20171207t021610z",
    "affinityId": "TVM:tvm-1219235766_1-20171207t021610z",
    "state": "idle",
    "errors": null,
    "endpointConfiguration": {
      "inboundEndpoints": [
        {
          "backendPort": 3389,
          "frontendPort": 50000,
          "name": "SSHRule.0",
          "protocol": "tcp",
          "publicFqdn": "dnsazurebatch-7e6395de-5c8f-444a-9cb6-f7bd809e2d3c-c.westus.cloudapp.azure.com",
          "publicIpAddress": "104.42.129.75"
        }
      ]
    },
    "ipAddress": "10.0.0.4",
    ...
  },
  {
    "id": "tvm-1219235766_2-20171207t021610z",
    "affinityId": "TVM:tvm-1219235766_2-20171207t021610z",
    "state": "idle",
    "errors": null,
    "endpointConfiguration": {
      "inboundEndpoints": [
        {
          "backendPort": 3389,
          "frontendPort": 50002,
          "name": "SSHRule.2",
          "protocol": "tcp",
          "publicFqdn": "dnsazurebatch-7e6395de-5c8f-444a-9cb6-f7bd809e2d3c-c.westus.cloudapp.azure.com",
          "publicIpAddress": "104.42.129.75"
        }
      ]
    },
    "ipAddress": "10.0.0.6",
    ...
  },
  {
    "id": "tvm-1219235766_3-20171207t021610z",
    "affinityId": "TVM:tvm-1219235766_3-20171207t021610z",
    "state": "idle",
    "errors": null,
    "endpointConfiguration": {
      "inboundEndpoints": [
        {
          "backendPort": 3389,
          "frontendPort": 50001,
          "name": "SSHRule.1",
          "protocol": "tcp",
          "publicFqdn": "dnsazurebatch-7e6395de-5c8f-444a-9cb6-f7bd809e2d3c-c.westus.cloudapp.azure.com",
          "publicIpAddress": "104.42.129.75"
        }
      ]
    },
    "ipAddress": "10.0.0.5",
    ...
  }
]

Manage your pool (computing nodes)

Batch pool is based on Azure Virtual Machine Scale Sets (VMSS) technology and you can manage these nodes like VMSS with batch cli.

For example, you can easily change the number of nodes (scale-out and scale-in) with the following command. Here we’re changing the number of node to 5.

az batch pool resize \
  --pool-id pool01 \
  --target-dedicated-nodes 5

Only nodes are charged as usage prices in Azure Batch. Then if you want to configure for cost-effective consumption, you can also enable automatic scaling (auto-scaling) for pool. Moreover you can also use low-priority VMs for batch pool, which is very low pricing virtual machine. (We used on-demand VMs in above examples.)

By default, tasks run under an auto-user account (named “_azbatch”). This account is built-in user account and is created automatically by the Batch service. When you want to login to each computing nodes, you can add the named user account instead of using auto-user account.
The following is adding the named user account into the node, which node id is “tvm-1219235766_1-20171207t021610z”. (This operation can also be done by Azure Portal UI.)

az batch node user create \
  --pool-id pool07 \
  --node-id tvm-1219235766_1-20171207t021610z \
  --name tsmatsuz \
  --password P@ssw0rd \
  --is-admin

Using named user account, you can connect to each computing node with RDP (Windows) or SSH (Linux).
With the previous “az batch node list” command, you can get the public ip and public port by inbound NAT for RDP (port 3389). (See the previous command results.) Or use “az batch node remote-login-settings show” command for the remote access addresses.

Job and Tasks

The one batch execution is controlled by the logical operation called “job”, and each process (command execution) is managed by “task”. One job has many tasks, and each tasks are launched by parallel or having dependencies.

The typical flow of batch execution is as follows :

  1. Create job (which is associated with the pool)
  2. Add tasks to the previous job
  3. Each tasks are automatically queued and scheduled for execution on computing nodes.

You can also assign job manager task, which determines the completion of all other tasks in the job. In this post, we don’t use the job manager task (jobManagerTask=null) for simplicity and we only use directly executed tasks under the job.

How to pass the results ? It depends on your design.
The task can write the result into the file or task’s standard output is written in the text file. After all tasks complete, you can collect (download) these outputs from VMs and create one mashed-up result.
Another approach is that you pass the connection (connection string, credentials, etc) for accessing storage (blob, db, etc) to tasks, and all tasks write results into this shared storage.
In this post, we use the first approach.

Now let’s see the following example. (As I mentioned above, %AZ_BATCH_APP_PACKAGE_app01#1.0% is the environment variable for batch task.)
After you add a task in a job, the task are automatically scheduled for execution. Even when all tasks are completed, no action is occurred by default. Then you must set “terminateJob” as --on-all-tasks-complete option, and the job is automatically completed after all tasks are completed.

Note :  You can also run your task as admin elevated process same like previous start-task. (Please specify the json file and set the detailed parameters same like previous start-task.)

# Create Job
az batch job create \
  --id job01 \
  --pool-id pool01
# Create Tasks
az batch task create \
  --job-id job01 \
  --task-id task01 \
  --application-package-references app01#1.0 \
  --command-line "cmd /c %AZ_BATCH_APP_PACKAGE_app01#1.0%\\myapp.exe"
az batch task create \
  --job-id job01 \
  --task-id task02 \
  --application-package-references app01#1.0 \
  --command-line "cmd /c %AZ_BATCH_APP_PACKAGE_app01#1.0%\\myapp.exe"
az batch task create \
  --job-id job01 \
  --task-id task03 \
  --application-package-references app01#1.0 \
  --command-line "cmd /c %AZ_BATCH_APP_PACKAGE_app01#1.0%\\myapp.exe"
az batch task create \
  --job-id job01 \
  --task-id task04 \
  --application-package-references app01#1.0 \
  --command-line "cmd /c %AZ_BATCH_APP_PACKAGE_app01#1.0%\\myapp.exe"
az batch task create \
  --job-id job01 \
  --task-id task05 \
  --application-package-references app01#1.0 \
  --command-line "cmd /c %AZ_BATCH_APP_PACKAGE_app01#1.0%\\myapp.exe"
az batch task create \
  --job-id job01 \
  --task-id task06 \
  --application-package-references app01#1.0 \
  --command-line "cmd /c %AZ_BATCH_APP_PACKAGE_app01#1.0%\\myapp.exe"
# When all tasks are completed, the job will complete
az batch job set \
  --job-id job01 \
  --on-all-tasks-complete terminateJob

You can monitor the state (“active”, “completed”, etc) of your job and task as follows.

# show job's state
az batch job show --job-id job01
{
  "creationTime": "2017-12-12T05:41:31.254257+00:00",
  "executionInfo": {
    "endTime": null,
    "poolId": "pool01",
    "schedulingError": null,
    "startTime": "2017-12-12T05:41:31.291277+00:00",
    "terminateReason": null
  },
  "id": "job01",
  "previousState": null,
  "previousStateTransitionTime": null,
  "state": "active",
  ...
}
# show task's state
az batch task show --job-id job01 --task-id task01
{
  "id": "task01",
  "commandLine": "cmd /c %AZ_BATCH_APP_PACKAGE_app01#1.0%\\\\myapp.exe",
  "constraints": {
    "maxTaskRetryCount": 0,
    "maxWallClockTime": "10675199 days, 2:48:05.477581",
    "retentionTime": "10675199 days, 2:48:05.477581"
  },
  "creationTime": "2017-12-12T05:42:08.762078+00:00",
  "executionInfo": {
    "containerInfo": null,
    "endTime": null,
    "exitCode": null,
    "failureInfo": null,
    "lastRequeueTime": null,
    "lastRetryTime": null,
    "requeueCount": 0,
    "result": null,
    "retryCount": 0,
    "startTime": null
  },
  "previousState": null,
  "previousStateTransitionTime": null,
  "state": "active",
  "stateTransitionTime": "2017-12-12T05:42:08.762078+00:00",
  ...
}

The task’s standard output is written in the file on the computing node. You can download these files with the following command.
Here the output file (stdout.txt) is saved (downloaded) as C:\tmp\node01-stdout.txt on the local computer (which is running your Azure CLI). You can view the all files on the computing node using “az batch node file list --recursive” command.

az batch node file download \
  --file-path "D:\\batch\\tasks\\workitems\\job01\\job-1\\task01\\stdout.txt" \
  --destination "C:\\tmp\\node01-stdout.txt" \
  --node-id tvm-1219235766_1-20171207t021610z \
  --pool-id pool01

The task is randomly assigned to the computing node, and the maximum number of concurrent task execution in each node is 1 by default.
As a result, each tasks are scheduled and executed as follows.

If you set “2” as maxTasksPerNode in pool creation (see the previous pooldef.json in above), each tasks will be executed as follows.

If the task is having relations between its results and inputs, you can also run each tasks with dependencies. (Use usesTaskDependencies option on job creation and use dependsOn parameter on task creation.)

Deliver Your Own Excel Custom Functions (JavaScript Add-in) for Users

As you know, you can build your own custom functions (UDF) with VBA since long before. But if you’re ISV folks, you must deliver your functions for your all customers by easy installation. VBA doesn’t matter in such a case.
With new capability in Excel Web Add-in, you can deliver your functions all over the world through marketplace. You can deliver your own advanced Excel functions, such as AI integrated functions, etc.

For example, you can easily expose your AI-driven web service with new Azure Machine Learning (v2) and python, and then the user can consume this service with your given Excel functions. (The user doesn’t need to know about the web services or complicated installation process.)
This concept is shown in Microsoft Ignite 2017 and see “Announcing tools for the AI-driven digital transformation” in team blog. This will be the sample of 1st party implementation for this experience.

The following is the advantages of this new custom functions. (As I write as “Note”, now this capability is in Preview and several limitations exist.)

  • It’s JavaScript. It can run on any platforms like Mac, Online, Mobile, etc.
    (Note : But now you must use only Excel Windows desktop client in current Preview.)
  • Are you a power user or a professional developer ? For the latter folks, once you’ve created your custom functions, you can submit and deliver through marketplace (Office store).
    (Note : But now you cannot submit your custom function add-in in current Preview.)

It’s now in Preview and you need Office Insider version (build 8711 or later) for running. Therefore please sign-up Office Insider before starting.

Let’s dive into 5 minutes’ example !

Logic and Registration

First you must create the html page (.html) and script file (.js), and locate (expose) in the internet.
The following is our example. Here we are using famous chance.js and it can be downloaded (copied) from chance.js site.

mypage.html

<!DOCTYPE html>
<html>
<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=Edge" />
  <meta http-equiv="Expires" content="0" />
  <title></title>
  <script src="https://appsforoffice.edog.officeapps.live.com/lib/beta/hosted/office.js" type="text/javascript"></script>
  <script src="https://example.com/chance.js" type="text/javascript"></script>
  <script src="https://example.com/myfunc.js" type="text/javascript"></script>
</head>
<body>
</body>
</html>

myfunc.js

Office.initialize = function(reason){
  function getrandom(x) {
    if (x == "phone")
      return chance.phone();
    else if (x == "zip")
      return chance.zip();
    else if (x == "address")
      return chance.address();
    else
      return "not supported";
  }
  
  Excel.Script.CustomFunctions = {};
  Excel.Script.CustomFunctions["TEST"] = {};
  Excel.Script.CustomFunctions["TEST"]["RANDOM"] = {
    call: getrandom,
    description: "Create various random data (phone, zip, etc)",
    result: {
      resultType: Excel.CustomFunctionValueType.string,
      resultDimensionality: Excel.CustomFunctionDimensionality.scalar,
    },
    parameters: [
      {
        name: "type of data",
        description: "Which type of data you need (phone, zip, etc)",
        valueType: Excel.CustomFunctionValueType.string,
        valueDimensionality: Excel.CustomFunctionDimensionality.scalar,
      }
    ]
  };
  
  Excel.run(function (context) {    
    context.workbook.customFunctions.addAll();
    return context.sync().then(function(){});
  }).catch(function(error){});
};

Let’s see what’s doing in this code.
First getrandom() is the logic of our custom function. As you can see, here we’re just creating the various random values using chance.js. If your custom function retrieves data from the web (i.e, async IO is invoked), you need an asynchronous function. (See “Github : Create custom functions in Excel (Preview)” for details about the asynchronous function.)

Note : It seems that some libraries (which are dynamically loaded libraries and so on) cannot be used in custom functions now …

Next we must define our custom function by Excel.Script.CustomFunctions (see Script.CustomFunctions reference) before registration. Here we’re using scalar value for input parameters and output result, but if you need some lookup for Excel ranges, you can also use matrix dimensions for parameters.

When the user add this custom function, this definition is registered by context.workbook.customFunctions.addAll(). Once the custom function is registered, the user can use this function in all the workbooks in Excel.
If you want to delete the registered custom functions, please use deleteAll() instead in the same add-in application. (See the following.)

Note : Or you can delete registered custom functions by removing %userprofile%\AppData\Local\Microsoft\Office\16.0\Wef\CustomFunctions folder in your local machine.

Office.initialize = function(reason){
  Excel.run(function (context) {
    // check if exists your custom function
    ...
    context.workbook.customFunctions.deleteAll();
    ...
  }).catch(function(error){});
};

Manifest

It’s ready to register your custom functions in Excel.
Now in current Preview, only sideloading installation (which is the installatoin process for developers) is supported. Therefore you must create shared folder in your local computer, and set this folder as Trusted Add-in Catalogs by the following steps.
In the future, the user will be able to install from marketplace with a few clicks !

  1. Open Excel
  2. Select “File” – “Options” menu.
  3. Select “Trust Center” tab and push “Trust Center Settings” button.
  4. Add your shared folder in Trusted Catalogs Table as the following screenshot.

Please create the following manifest file (UTF-8 encoding) and locate this file (.xml) in this shared folder.

<?xml version="1.0" encoding="utf-8"?>
<OfficeApp xmlns="http://schemas.microsoft.com/office/appforoffice/1.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:bt="http://schemas.microsoft.com/office/officeappbasictypes/1.0" xmlns:ov="http://schemas.microsoft.com/office/taskpaneappversionoverrides" xsi:type="TaskPaneApp">
  <Id>a4b7679d-7758-48a7-8bf3-7fb37fc2f20b</Id>
  <Version>1.0.0.0</Version>
  <ProviderName>Tsuyoshi Matsuzaki</ProviderName>
  <DefaultLocale>en-US</DefaultLocale>
  <DisplayName DefaultValue="My test functions" />
  <Description DefaultValue="custom functions test" />
  <Hosts>
    <Host Name="Workbook" />
  </Hosts>
  <DefaultSettings>
    <SourceLocation DefaultValue="https://example.com/mypage.html"/>
  </DefaultSettings>
  <Permissions>ReadWriteDocument</Permissions>
  <VersionOverrides xmlns="http://schemas.microsoft.com/office/taskpaneappversionoverrides" xsi:type="VersionOverridesV1_0">
    <Hosts>
      <Host xsi:type="Workbook">
        <AllFormFactors>
          <ExtensionPoint xsi:type="CustomFunctions">
            <Script>
              <SourceLocation resid="functionsjs" />
              <SourceLocation resid="chancejs" />
            </Script>
            <Page>
              <SourceLocation resid="pagehtml"/>
            </Page>
          </ExtensionPoint>
        </AllFormFactors>
      </Host>
    </Hosts>
    <Resources>
      <bt:Urls>
        <bt:Url id="functionsjs" DefaultValue="https://example.com/myfunc.js" />
        <bt:Url id="chancejs" DefaultValue="https://example.com/chance.js" />
        <bt:Url id="pagehtml" DefaultValue="https://example.com/mypage.html" />
      </bt:Urls>
    </Resources>
  </VersionOverrides>
</OfficeApp>

You may wonder why the JavaScript reference is needed in this manifest (see <Script> element above). In fact, this tag is not used in current Preview, but in the future the script will be downloaded and can be used offline and packaged. Once the custom function is installed, it can run offline in Excel in the future.

Run !

Now let’s see how it works.

Open Excel and select “My add-ins” in “Insert” tab. Then you can find your custom add-in as follows.

After you add this add-in in your client, you can use your custom functions with auto complete suggestion, hint, help file, etc.

Of course, you can use several existing Excel capabilities like “auto-fill” together. The users can accelerate their works integrating a lot of Excel capabilities with your advanced functions.

All functions are called (invoked) simultaneously and executed each one at a time. (Because JavaScript is single-threading…) In the future, you can use batch calling which aggregates multiple calls into one function call and you can improve performance.

You can also use the streamed function which can dynamically update the cell, such as IoT sensor’s inputs. (The associated graph is also updated by real-time.)

You can download the following complete example by Microsoft. Please try.

Github: Create custom functions in Excel
https://github.com/OfficeDev/office-js-docs/blob/master/docs/excel/custom-functions-overview.md

 

Now this is just initial Preview. The capabilities and performance will be accelerated in the future. (Currently, the add-ins rely on a hidden browser process, but it will work in the same process in the future.)
Let’s enjoy and look forward to the future update !