Use Spark and Streaming to predict sentiment | ![]() |
---|---|
![]() |
This notebook contains steps and code to develop a predictive model, and start scoring new data. This notebook introduces commands for getting data and for basic data cleaning and exploration, pipeline creation, model training, model persistance to Watson Machine Learning repository, model deployment, and scoring.
Some familiarity with Python is helpful. This notebook uses Python 3.5 and Apache® Spark 2.
You will use a data set named Twitter Sentiment, which includes many tweets and their binary labels (e.g. POSITIVE or NEGATIVE). Use the details of this data set to predict sentiment of tweets.
The learning goals of this notebook are:
This notebook contains the following parts:
Before you use the sample code in this notebook, you must perform the following setup tasks:
Note: Get your credentials from Manage Credentials section of Watson Machine Learning and MessageHub Service.
The Twitter Sentiment data is available on project's Github page.
In this section you will load the data as an Apache® Spark DataFrame and perform a basic exploration.
Tip: First, you need to install required packages. You can do it by running the following code. You can run the code below after changing cell type to code
.
import wget
from pprint import PrettyPrinter
pp = PrettyPrinter(indent=2, depth=3).pprint
link_to_data = 'https://github.com/pmservice/wml-sample-models/raw/master/spark/sentiment-prediction/data/trainingTweets.csv'
filename = wget.download(link_to_data)
print(filename)
trainingTweets.csv
The csv file trainingTweets.csv is availble on gpfs now. Load the file to Apache® Spark DataFrame using code below.
spark = SparkSession.builder.getOrCreate()
df_data = spark.read\
.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
.option('header', 'true')\
.option('inferSchema', 'true')\
.load(filename)
Explore the loaded data by using Apache® Spark DataFrame methods:
df_data.printSchema()
root |-- id: integer (nullable = true) |-- text: string (nullable = true) |-- label: integer (nullable = true)
As you can see, the data contains 3 fields. label
field is the one we would like to classify tweets.
df_data.show(n=10)
+---+--------------------+-----+ | id| text|label| +---+--------------------+-----+ | 1|realdonaldtrump s...| 1| | 2|cnnvideo hillaryc...| 1| | 3|realdonaldtrump t...| 1| | 4|sensanders the am...| 1| | 5|billburton realdo...| 0| | 6|reince hillarycli...| 0| | 7|bentechpro realdo...| 1| | 8|dahbigj hopeobama...| 0| | 9|theosmelek thuddy...| 0| | 10|realdonaldtrump r...| 0| +---+--------------------+-----+ only showing top 10 rows
print("Total number of records: {count}".format(count=df_data.count()))
Total number of records: 5987
Data set contains 5987 records.
In this section you will learn how to prepare data, create an Apache® Spark machine learning pipeline, and train a model.
In this subsection you will split your data into:
splitted_data = df_data.randomSplit([0.8, 0.18, 0.02], 24)
train_data = splitted_data[0]
test_data = splitted_data[1]
predict_data = splitted_data[2]
print("Number of training records: {count}".format(count=train_data.count()))
print("Number of testing records: {count}".format(count=test_data.count()))
print("Number of prediction records: {count}".format(count=predict_data.count()))
Number of training records: 4783 Number of testing records: 1076 Number of prediction records: 128
In this section you will create an Apache® Spark machine learning pipeline and then train the model.
In the first step you need to import the Apache® Spark machine learning packages that will be needed in the subsequent steps.
from pyspark.ml.feature import Tokenizer, OneHotEncoder, StringIndexer, HashingTF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline, Model
In the data preprocessing step, convert all the string fields to numeric ones by using Tokenizer and then HashingTF transformer.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
Next, define estimator you want to use for classification. Logistic Regression is used in the following example.
lr = LogisticRegression(maxIter=10, regParam=0.01)
Let's build the pipeline now. A pipeline consists of transformers and an estimator.
pipeline_lr = Pipeline(stages=[tokenizer, hashingTF, lr])
Now, you can train your Logistic Regression model by using previously defined pipeline and train data.
model_lr = pipeline_lr.fit(train_data)
You can evaluate the model on the test data. Area under ROC will be used as evaluation metrics.
predictions = model_lr.transform(test_data)
evaluatorRF = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
accuracy = evaluatorRF.evaluate(predictions)
print("Accuracy = {acc:4.3f}".format(acc=accuracy))
Accuracy = 0.762
Now you can tune your model to achieve better accuracy. For simplicity of this notebook, tuning section is omitted.
In this section you will learn how to store your pipeline and model in Watson Machine Learning repository by using python client libraries.
First, you must install and import Watson Machine Learning client libraries.
!rm -rf $PIP_BUILD/watson-machine-learning-client
!pip install watson-machine-learning-client --upgrade
Requirement already up-to-date: watson-machine-learning-client in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages Requirement already up-to-date: tqdm in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages (from watson-machine-learning-client) Requirement already up-to-date: tabulate in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages (from watson-machine-learning-client) Requirement already up-to-date: urllib3 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages (from watson-machine-learning-client) Requirement already up-to-date: certifi in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages (from watson-machine-learning-client) Requirement already up-to-date: pandas in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages (from watson-machine-learning-client) Requirement already up-to-date: lomond in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages (from watson-machine-learning-client) Requirement already up-to-date: requests in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages (from watson-machine-learning-client) Requirement already up-to-date: python-dateutil>=2 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages (from pandas->watson-machine-learning-client) Requirement already up-to-date: numpy>=1.9.0 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages (from pandas->watson-machine-learning-client) Requirement already up-to-date: pytz>=2011k in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages (from pandas->watson-machine-learning-client) Requirement already up-to-date: six>=1.10.0 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages (from lomond->watson-machine-learning-client) Requirement already up-to-date: chardet<3.1.0,>=3.0.2 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages (from requests->watson-machine-learning-client) Requirement already up-to-date: idna<2.7,>=2.5 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s86b-18e61b28c674e4-3fbaf243aed6/.local/lib/python3.5/site-packages (from requests->watson-machine-learning-client)
from watson_machine_learning_client import WatsonMachineLearningAPIClient
Authenticate to the Watson Machine Learning service on IBM Cloud.
Tip: Authentication information (your credentials) can be found in the Service Credentials tab of the service instance that you created on IBM Cloud. If you cannot see the instance_id field in Service Credentials, click New credential (+) to generate new authentication information.
Action: Enter your Watson Machine Learning service instance credentials here.
wml_credentials={
"url": "https://ibm-watson-ml.mybluemix.net",
"access_key": "****",
"username": "****",
"password": "****",
"instance_id": "****"
}
# The code was removed by Watson Studio for sharing.
Create WML client and authorize it.
client = WatsonMachineLearningAPIClient(wml_credentials)
client.version
'1.0.53'
In this subsection you will learn how to save pipeline and model artifacts to your Watson Machine Learning instance.
saved_model_details = client.repository.store_model(model=model_lr, meta_props={"name": "Sentiment Prediction Model"}, training_data=train_data, pipeline=pipeline_lr)
model_uid = client.repository.get_model_uid(saved_model_details)
print(model_uid)
9faf9502-87ca-4a15-8581-060cc84ccc5b
Check model details:
print(saved_model_details)
{'metadata': {'created_at': '2018-03-29T10:37:16.069Z', 'url': 'https://ibm-watson-ml.mybluemix.net/v3/wml_instances/3f6e5c2b-4880-46aa-9d79-62e90ccc9d56/published_models/9faf9502-87ca-4a15-8581-060cc84ccc5b', 'modified_at': '2018-03-29T10:37:16.143Z', 'guid': '9faf9502-87ca-4a15-8581-060cc84ccc5b'}, 'entity': {'latest_version': {'created_at': '2018-03-29T10:37:16.143Z', 'url': 'https://ibm-watson-ml.mybluemix.net/v3/ml_assets/models/9faf9502-87ca-4a15-8581-060cc84ccc5b/versions/42cec03e-bdde-4996-a0b9-118e143f804b', 'guid': '42cec03e-bdde-4996-a0b9-118e143f804b'}, 'runtime_environment': 'spark-2.1', 'name': 'Sentiment Prediction Model', 'learning_configuration_url': 'https://ibm-watson-ml.mybluemix.net/v3/wml_instances/3f6e5c2b-4880-46aa-9d79-62e90ccc9d56/published_models/9faf9502-87ca-4a15-8581-060cc84ccc5b/learning_configuration', 'model_type': 'mllib-2.1', 'input_data_schema': {'type': 'struct', 'fields': [{'metadata': {}, 'nullable': True, 'type': 'integer', 'name': 'id'}, {'metadata': {}, 'nullable': True, 'type': 'string', 'name': 'text'}]}, 'feedback_url': 'https://ibm-watson-ml.mybluemix.net/v3/wml_instances/3f6e5c2b-4880-46aa-9d79-62e90ccc9d56/published_models/9faf9502-87ca-4a15-8581-060cc84ccc5b/feedback', 'label_col': 'label', 'deployments': {'url': 'https://ibm-watson-ml.mybluemix.net/v3/wml_instances/3f6e5c2b-4880-46aa-9d79-62e90ccc9d56/published_models/9faf9502-87ca-4a15-8581-060cc84ccc5b/deployments', 'count': 0}, 'training_data_schema': {'type': 'struct', 'fields': [{'metadata': {}, 'nullable': True, 'type': 'integer', 'name': 'id'}, {'metadata': {}, 'nullable': True, 'type': 'string', 'name': 'text'}, {'metadata': {}, 'nullable': True, 'type': 'integer', 'name': 'label'}]}, 'learning_iterations_url': 'https://ibm-watson-ml.mybluemix.net/v3/wml_instances/3f6e5c2b-4880-46aa-9d79-62e90ccc9d56/published_models/9faf9502-87ca-4a15-8581-060cc84ccc5b/learning_iterations', 'evaluation_metrics_url': 'https://ibm-watson-ml.mybluemix.net/v3/wml_instances/3f6e5c2b-4880-46aa-9d79-62e90ccc9d56/published_models/9faf9502-87ca-4a15-8581-060cc84ccc5b/evaluation_metrics', 'author': {}}}
In this subsection you will learn how to load back saved model from specified instance of Watson Machine Learning.
loaded_model = client.repository.load(model_uid)
You can check the type of model. As it is the same model you saved, you can use it for local scoring.
print(type(loaded_model))
<class 'pyspark.ml.pipeline.PipelineModel'>
As you can see the type is correct. You have already learned how save and load the model from Watson Machine Learning repository.
In this section you will learn how to score test data using loaded model and visualize the prediction results with plotly package.
In this subsection you will score predict_data
data set.
predictions = loaded_model.transform(predict_data)
Preview the results by calling the show() method on the predictions DataFrame.
predictions.show(5)
+---+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+ | id| text|label| words| features| rawPrediction| probability|prediction| +---+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+ |254|realdonaldtrump t...| 1|[realdonaldtrump,...|(262144,[4312,961...|[-1.3150612511415...|[0.21164114610918...| 1.0| |256|realdonaldtrump j...| 0|[realdonaldtrump,...|(262144,[14,13396...|[5.48207446382384...|[0.99585655006955...| 0.0| |296|realdonaldtrump t...| 1|[realdonaldtrump,...|(262144,[15889,21...|[-1.3175186267989...|[0.21123142555360...| 1.0| |312|sensanders keep t...| 0|[sensanders, keep...|(262144,[32890,91...|[3.29411163264101...|[0.96422625174639...| 0.0| |362|katiedaviscourt i...| 0|[katiedaviscourt,...|(262144,[16332,21...|[3.83815328742004...|[0.97892057960460...| 0.0| +---+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+ only showing top 5 rows
By tabulating a count, you can see the split by sentiment.
predictions.select("label").groupBy("label").count().show()
+-----+-----+ |label|count| +-----+-----+ | 1| 49| | 0| 79| +-----+-----+
In this subsection you will explore prediction results with Plotly, which is an online analytics and data visualization tool.
Tip: First, you need to install required packages. You can do it by running the following code. Run it only one time.
import sys
import pandas
import plotly.plotly as py
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import cufflinks as cf
import plotly.graph_objs as go
init_notebook_mode(connected=True)
sys.path.append("".join([os.environ["HOME"]]))
You have to convert the Apache Spark DataFrame to a Pandas DataFrame to be used by ploting function.
predictions_pdf = predictions.select("prediction", "label", "text").toPandas()
cumulative_stats = predictions_pdf.groupby(['label']).count()
labels_data_plot = cumulative_stats.index
values_data_plot = cumulative_stats['text']
Plot a pie chart that shows the predicted sentiment label.
product_data = [go.Pie(
labels=labels_data_plot,
values=values_data_plot,
)]
product_layout = go.Layout(
title='Sentiment',
)
fig = go.Figure(data=product_data, layout=product_layout)
iplot(fig)
In this section you will learn how to create streaming deployment by using the Watson Machine Learning REST API and work with IBM Cloud MessageHub. For more information about REST APIs, see the Swagger Documentation.
client.repository.list_models()
------------------------------------ ---------------------------------- ------------------------ ----------------- GUID NAME CREATED FRAMEWORK 43986a55-0f0e-493b-9b98-0712f918b0ba MNIST CNN model 2018-03-15T16:12:06.503Z tensorflow-1.5 a5a952e1-fa6f-4c64-88f7-293b367b7e1f MNIST best model 2018-03-16T10:12:16.773Z tensorflow-1.5 c755c740-dab6-4d5d-b873-1060885d55d5 MNIST best model 2018-03-16T10:12:31.990Z tensorflow-1.5 861b6b73-6861-49c0-867c-d171d6d9d545 MNIST best model 2018-03-16T13:00:40.461Z tensorflow-1.5 f7219c8f-fccb-4d26-a1bd-f51544090eae training-S3ttnlgig_2 best model 2018-03-16T13:32:33.747Z tensorflow-1.5 327242a2-d2c0-400f-b673-4f33ad7ec17d product-line-prediction 2018-03-16T18:08:31.260Z mllib-2.0 a0cb79ca-e937-4cbf-b7e5-23dad2d2ecec drug-selection 2018-03-16T18:10:25.046Z mllib-2.0 97822727-4bd1-4df0-95df-8ff9536ea01d My cool mnist model 2018-03-28T09:02:31.509Z tensorflow-1.5 84070d90-6db7-4a12-8da6-26aaf18e9378 My cool mnist model tf-horovod 2018-03-28T11:15:53.446Z tensorflow-1.5 b6e59ce5-7d24-4ce1-90f8-f6a918ad71ca My cool mnist model tf-distributed 2018-03-28T11:15:56.061Z tensorflow-1.5 91149430-5588-4ee6-ac6a-695cd2392aa8 My cool mnist model tf-ddl 2018-03-28T11:15:59.813Z tensorflow-1.5 5bcf67ab-cd88-49c9-99fd-99dde60eff70 SPSS model for Churn prediction 2018-03-28T13:11:28.164Z spss-modeler-17.1 9fb96bc1-bc12-4717-a03f-9b009a32d578 SPSS model for Churn prediction 2018-03-28T13:20:40.791Z spss-modeler-18.1 0da45aa2-5378-4dc1-8617-3d209499f466 my spark model 2018-03-28T14:49:01.055Z mllib-2.1 298832f7-d21d-4a07-a028-b5716dba1264 Best Heart Drug Selection 2018-03-29T08:35:31.303Z mllib-2.1 70a8b8ad-ef04-4d9f-a1e7-679719577b62 Product line model 2018-03-29T09:21:46.826Z mllib-2.1 9faf9502-87ca-4a15-8581-060cc84ccc5b Sentiment Prediction Model 2018-03-29T10:37:16.069Z mllib-2.1 ------------------------------------ ---------------------------------- ------------------------ -----------------
Action: Put credentials from your instance of Spark Service in cell bellow.
spark_credentials={
"tenant_id": "****",
"tenant_id_full": "****",
"cluster_master_url": "https://spark.bluemix.net",
"tenant_secret": "****",
"instance_id": "****",
"plan": "ibm.SparkService.PayGoPersonal"
}
# The code was removed by Watson Studio for sharing.
Encode the spark credentials.
import base64
import json
spark_instance = {
"credentials": spark_credentials,
"version": "2.0"
}
encoded_spark_instance = base64.b64encode(json.dumps(spark_instance).encode('utf-8'))
input_topic_name = 'streamingi'
output_topic_name = 'streamingo'
Action: Please paste credentials of your MessageHub Service to cell bellow.
messagehub_credentials={
"instance_id": "****",
"mqlight_lookup_url": "https://mqlight-lookup-prod01.messagehub.services.us-south.bluemix.net/Lookup?serviceId=fa4d499e-b908-4982-84f0-69c30c1b30a0",
"api_key": "****",
"kafka_admin_url": "https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443",
"kafka_rest_url": "https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443",
"kafka_brokers_sasl": [
"kafka03-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka04-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka01-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka02-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka05-prod01.messagehub.services.us-south.bluemix.net:9093"
],
"user": "****",
"password": "****"
}
# The code was removed by Watson Studio for sharing.
import requests
messagehub_instance = {x: messagehub_credentials[x] for x in messagehub_credentials.keys() if x != "instance_id"}
header_streaming = {'Content-Type': 'application/json', 'Authorization': 'Bearer {token}'.format(token=client.wml_token), 'X-Spark-Service-Instance': encoded_spark_instance}
payload_streaming = {"type":"stream", "name": "Sentiment Prediction", "description": "Streaming Deployment",
"input": {"connection": messagehub_instance, "source": { "type": "kafka", "topic": input_topic_name}},
"output": {"connection": messagehub_instance, "target": {"type": "kafka", "topic": output_topic_name}}}
endpoint_deployments = client.repository.get_model_url(saved_model_details) + "/deployments"
response_streaming = requests.post(endpoint_deployments, json=payload_streaming, headers=header_streaming)
print(response_streaming)
pp(json.loads(response_streaming.text))
Now, you have streaming deployment of your model ready to make predictions.
When we want to make predictions with streaming deployment of our model we have to push messages to input topic. Streaming deployment will process that message and write results to output topic.
messagehub_topic_header = {'X-Auth-Token': messagehub_credentials['api_key']}
messagehub_endpoint = 'https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443/topics/'
messagehub_response = requests.get(messagehub_endpoint, headers=messagehub_topic_header)
print(messagehub_response)
pp(json.loads(messagehub_response.text))
<Response [200]> ['strimingi', 'strimingo']
Note: You should see two topics: streamingi and streamingo created in setup step.
We have to encode input data to send it to stream.
encoded_payload_value = base64.b64encode(json.dumps({'id': 1, 'text':'My first streaming deployment! It feels so awesome!!'}).encode('utf-8'))
print(encoded_payload_value)
b'eyJpZCI6IDEsICJ0ZXh0IjogIk15IGZpcnN0IHN0cmVhbWluZyBkZXBsb3ltZW50ISBJdCBmZWVscyBzbyBhd2Vzb21lISEifQ=='
In order to push encoded data to our topic, we will use below curl call.
messagehub_header = dict({'Content-Type': 'application/vnd.kafka.binary.v1+json'}, **messagehub_topic_header)
input_payload = {'records':[{ 'key': 'null', 'value': encoded_payload_value}]}
messagehub_input_endpoint = messagehub_endpoint + 'streamingi'
response_message_input = requests.post(messagehub_input_endpoint, json=input_payload, headers=messagehub_header)
print(response_message_input)
pp(json.loads(response_message_input.text))
Create MessageHub consumer
output_consumer_payload = {"format": "binary", "auto.offset.reset": "smallest"}
messagehub_output_consumer_endpoint = 'https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443/consumers/my_output_consumer'
messagehub_output_consumer_response = requests.post(messagehub_output_consumer_endpoint, json=output_consumer_payload, headers=messagehub_header)
print(messagehub_output_consumer_response)
pp(json.loads(messagehub_output_consumer_response.text))
Read messages from output topic
messagehub_read_output_endpoint = "{uri}/topics/{output_topic}".format(uri=json.loads(messagehub_output_consumer_response.text).get('base_uri'), output_topic=output_topic_name)
messagehub_read_output_response = requests.get(messagehub_read_output_endpoint, headers=messagehub_header)
print(messagehub_read_output_response)
pp(json.loads(messagehub_read_output_response.text))
Decode the output message
message_output_encoded = json.loads(messagehub_read_output_response.text)[0].get('value')
message_output_decoded = base64.b64decode(message_output_encoded)
pp(message_output_decoded)
You successfully completed this notebook! You learned how to use Apache Spark machine learning as well as Watson Machine Learning for model creation and deployment. Check out our Online Documentation for more samples, tutorials, documentation, how-tos, and blog posts.
Umit Mert Cakmak is a Data Scientist at IBM with a track record of developing enterprise-level applications that substantially increases clients' ability to turn data into actionable knowledge.
Copyright © 2017, 2018 IBM. This notebook and its source code are released under the terms of the MIT License.