Python client for “Analytics Engine powered by Apache Spark” service APIs on IBM Cloud Pak for Data

Sunil Ganatra
6 min readNov 27, 2020

Every company has an AI Strategy in the roadmap & Spark is one of the pillars, if you want to build a Data & AI pipeline. Users can write spark applications to collect, analyze, refine, visualize and extract the meaning insights out of structure or unstructured data. IBM Cloud Pak for data is an answer for those customers who want to build their Data & AI pipeline and “Analytics Engine powered by Apache Spark” service is one of the bedrock of IBM Cloud Pak for data. You can read more about “Analytics Engine powered by Apache Spark” service in this BLOG.

“Analytics Engine powered by Apache Spark” service already has very rich REST API documentation. Any user who wants to automate spark job submission can leverage those APIs.

The purpose of this blog is to share details of the Python Client, which abstracts the usage of APIs and makes it more simple for a persona like Data Scientists to use them easily.

Note: I will be using spark application / spark job interchangeably. This python client is tested on [CPD 3.0.1 + cpd-3.0.1-spark-patch-2] & CPD 3.5.0

To submit spark application/job with Analytics Engine,

  • You have to provision an instance of an Analytics Engine which is a logical entity to group, set of applications under one umbrella. Since, there are not resources bound with an instance, you can have multiple instances there is no limit on that.
  • One of the ways of persisting spark application is to persist it in a volume service instance. Other ways of persisting spark application is documented HERE.

What is volume service ? How can it be used ? And what are the benefits of it ? is very well Documented. In this use-case we will cover how to persist spark application in a volume service instance.

  • Submit Spark application, Analyze / Monitor job execution & Download your application logs.
Working with the Analytics Engine

Let’s see how we can do all this using Python Client. Follow the steps provided in Python Client git repo to install the client package. Make sure you have connectivity to Cloud Pak for Data from the machine where you have installed the client.

For ease, I have initializing few variables in my python code. My Cloud Pak for Data Hostname, Username, Password. The Analytic Engine instance name[SPARK_INSTANCE] under which we will submit spark applications. Volume service instance [SPARK_INSTANCE_VOLUME] which we will use as an instance home of our AE instance. Finally, a volume service instance [APP_VOLUME_INSTANCE] which i will be using to persist my spark applications.

CloudPakForData_HOSTNAME = "REPLACE_ME_WITH_CPD_HOSTNAME"
USER = "REPLACE_ME_WITH_USERNAME"
PASSWORD = "REPLACE_ME_WITH_PASSWORD"
SPARK_INSTANCE = "spark-inst1"
SPARK_INSTANCE_VOLUME = "spark-inst1-volume"
APP_VOLUME_INSTANCE = "app-vol"

Create an object of AnalyticEngineClient
We need to import the AnalyticEngineClient class from the package. Initialize a client object of analytic engine by passing hostname of Cloud Pak for Data & your username, password of Cloud Pak for Data platform.

# Analytics Engine Powered by apache spark Python Demo
from analytic_engine_client import AnalyticEngineClient
# Initializing client
ae = AnalyticEngineClient(host=CloudPakForData_HOSTNAME, uid=USER, pwd=PASSWORD)

Create a volume service instance
Analytics Engine instance requires a volume service instance, which will act as an instance home volume to store spark events & application logs of all spark applications you submit under this instance. In order to create a volume service instance, you have to pass the storage class and storage size of the volume.

# Volume instance creation payload
volume_instance_payload = {
"metadata": {
"storageClass": "managed-nfs-storage",
"storageSize": "20Gi"
},
"resources": {},
"serviceInstanceDescription": "volume instance for spark"
}
#Create volume instance
ae.create_volume(SPARK_INSTANCE_VOLUME,create_arguments=volume_instance_payload)

Create AE instance
You need to set the volume service instance name [which you created in step1] in spark instance creation payload.

# Spark instance creation payload
spark_instance_payload = {
"metadata":{
"volumeName": SPARK_INSTANCE_VOLUME,
"storageClass": "managed-nfs-storage",
"storageSize": "20Gi"
},
"resources": {},
"serviceInstanceDescription": "Spark instance 1"
}
# Spark instance creation
ae.create_instance(SPARK_INSTANCE,service_instance_version='3.0.1',create_arguments=spark_instance_payload)

View AE instance details
You can get instance details, e.g what is the volume instance tied with this AE instance, history server UI endpoint etc.

# Get spark instance details
ae.get_instance_details(SPARK_INSTANCE)

Submit a simple spark application
To check out how AE & client works, you can create a simple word count job using this method.

# Simple spark job payload
word_count_payload = {
"engine": {
"type": "spark"
},
"application_arguments": ["/opt/ibm/spark/examples/src/main/resources/people.txt"],
"main_class": "org.apache.spark.deploy.SparkSubmit",
"application": "/opt/ibm/spark/examples/src/main/python/wordcount.py"
}
# Create simple spark job
ae.submit_job(SPARK_INSTANCE,params_json=word_count_payload)

Get resource quota of AE instance
Resource quota is a very nice feature to control the resource consumption by any AE instance. You can read more about resource quota in this document.

# instance resource quota
ae.get_instance_resource_quota(SPARK_INSTANCE)

Update resource quota for AE instance
Default value of resource quota is set to 20cpu & 80gb. Suppose you want to submit large spark application, which will consume more resource than you can update the resource quota for your AE instance

# Update Resource Quota
ae.update_instance_resource_quota(SPARK_INSTANCE, cpu_quota=500, memory_quota='2000g')

Upload & Submit large scale spark application
Suppose you have developed a spark application locally and now you want to run it on Analytic Engine. In that case you can pass volume name, your spark application file/jar & spark job payload as an argument to this api. It will upload your spark application to an app volume and submit spark job to run that.

# app volume instance creation payload
volume_instance_payload = {
"metadata": {
"storageClass": "managed-nfs-storage",
"storageSize": "20Gi"
},
"resources": {},
"serviceInstanceDescription": "sample volume"
}
# Create volume instance
ae.create_volume(APP_VOLUME_INSTANCE,create_arguments=volume_instance_payload)
# Define spark job payload for upload & submit job method.
payload={
"engine": {
"type": "spark",
"template_id": "spark-2.4.0-jaas-v2-cp4d-template",
"conf": {
"spark.app.nam": "myjob1"
},
"size": {
"num_workers": 15,
"worker_size": {
"cpu": 4,
"memory": "20g"
},
"driver_size": {
"cpu": 5,
"memory": "20g"
}
}
},
"application_arguments": [],
"main_class": "org.apache.spark.deploy.SparkSubmit"
}
# Upload spark job & submit
ae.upload_and_submit_job(SPARK_INSTANCE,APP_VOLUME_INSTANCE,'/Users/sunilganatra/testSparkApp.py',params_json=payload)

Get all spark application
To get all ACTIVE, FINISHED spark jobs.

# Get all spark jobs of an Instance
ae.get_all_jobs(SPARK_INSTANCE)

Delete a spark application
To delete any spark job.

# Delete spark job
ae.delete_spark_job(SPARK_INSTANCE,job_id='REPLACE_ME_WITH_JOB_ID')

Start the History Server
To analyz / monitor your spark application performance / execution you can start a history server for an Analytics Engine instance.

# Start History Server
ae.start_history_server(SPARK_INSTANCE)

Get History Server UI endpoint
To view history server you can call this api to get the URL of history server. which you can open in your browser to view spark events.

# Get History Server Url
ae.get_history_server_ui_end_point(SPARK_INSTANCE)

Stop the History Server
To stop the history server

# Stop History Server
ae.stop_history_server(SPARK_INSTANCE)

Delete all finished spark application
If you want to delete all jobs which has reached FINISHED state, you can call this api.

# Delete all finished spark job
ae.delete_all_finished_spark_job(SPARK_INSTANCE)

Delete all spark application
If you want to delete all jobs irrespective of its state, you can call this api.

# Delete all spark job
ae.delete_all_spark_job(SPARK_INSTANCE)

This client also offers many other apis which is listed here, they can make life easy for data scientist who knows python very well and not looking forward to automate things in customer environment.

# list of apis offered by the AnalyticEngineClient
ae.add_file_to_volume( ae.get_all_volumes( ae.host
ae.create_instance( ae.get_file_from_volume( ae.job_token
ae.create_volume( ae.get_history_server_end_point( ae.start_history_server(
ae.delete_instance( ae.get_history_server_ui_end_point( ae.start_volume(
ae.delete_spark_job( ae.get_instance_details( ae.stop_history_server(
ae.delete_volume( ae.get_instance_id( ae.submit_job(
ae.download_logs( ae.get_instance_resource_quota( ae.submit_word_count_job(
ae.get_all_instances( ae.get_spark_end_point( ae.token
ae.get_all_jobs( ae.get_spark_job_status( ae.update_instance_resource_quota(
ae.get_all_storage_class( ae.get_volume_status( ae.upload_and_submit_job(

Please provide your feedback on this article. You can reach out to me Sunil Ganatra via twitter using my handler sunil_ganatra or reach out to me on LinkedIn.

References :-

--

--

Sunil Ganatra

Senior Engineer@IBM Analytics Engine, Watson Studio Spark Environments and Analytics Engine powered by Apache Spark on IBM Cloud Pak for Data. #k8s #docker