Keisan

Real-Time Machine Learning Pipeline with Apache Spark

Data Intelligence

Real-Time Machine Learning Pipeline with Apache Spark

How to build predictive models using a real-time machine learning pipeline in Apache Spark

Real-Time Machine Learning Pipeline with Apache Spark


In a previous article entitled 'Real-Time Data Pipeline with Apache Kafka and Spark' I described how we can build a high-throughput, scalable, reliable and fault-tolerant data pipeline capable of fetching event-based data and eventually streaming those events to Apache Spark where we processed them. I ended the last article by simply using Apache Spark to consume the event-based data and printing them to the console. In my last article entitled 'Apache Kafka Producer with Avro Bijection', I updated our data-pipeline to use an Apache Kafka Producer to collect tweets using Twitter's Hosebird Client within a Twitter4j wrapper where the tweets were serialised using the Avro library and Bijection. In this article, we will be using that updated event-based data pipeline to build predictive models in real-time using Apache Spark.

The Goal

Our goal in this article will be to perform real-time sentiment analysis on tweets about airlines - particularly pertinent given the recent worldwide IT failure at British Airways! We will attempt to automatically classifiy tweets about airlines as being either negative or not-negative towards that airline as our data pipeline receives them in near real-time.

The Data

The dataset that we will use to build our training model is a collection of 14,640 tweets regarding U.S. airlines from February 2015 sourced from CrowdFlower. The tweets have been given a classification as to whether their textual content is positive, negative or neutral towards the airline in question. The dataset can be downloaded using the following link.

The Basics

Let's start by explaining some of the basic theory behind some of the modelling concepts that I will introduce later in the article. The overriding intention behind our Knowledge Base is to make knowledge regarding big-data and data analytics more accessible to as many people as possible. That being said, whilst this section aims to introduce some of the more fundamental concepts, it would be difficult to explain in detail the mathematical theory in just one page. Therefore, if this articles sparks your interest (and I hope that it will!), I would encourage you to check out some of the fantastic online courses available at sites like Coursera and edX, or even sign-up to more formal courses offered by organisations such as The Open University.

Predictive Models

Predictive models are built to make predictions! Supervised Learning is a type of machine learning algorithm that uses a training datatset to train a model, based on known outcomes. The model is then applied to new data that it has not seen before to make predictions. Those predictions could take the form of a categorical prediction (e.g. yes/no, positive/negative/neutral etc.) - called a Classification algorithm - or a continuous prediction (e.g. price, weight etc.) - called a Regression algorithm. Example Supervised Learning algorithms include Linear Regression, Logistic Regression and Classification and Regression Trees. In this article, I will introduce the latter - Classification and Regression Trees - and show how we can use these algorithms to train a model to classify our tweets based on the training dataset that we downloaded above. We will then take this training model and apply it to the stream of real-time tweets from our data pipeline that it has never seen before in order to try and classify them too!

Classification and Regression Trees (CART)

Classification and Regression Tree (CART) models generate trees that can be traversed to predict an outcome. For those of your familiar with Linear Regression and Logistic Regression, CART does not assume a linear model like Linear Regression, and is much easier to understand and interpret than compared to Logistic Regression. The following image taken from Wikipedia shows a simple tree that can help us predict whether you were likely to survive as a passenger of the Titanic based on just a few attributes or properties, where the numbers below each leaf shows the probability of survival and the percentage of observations in that leaf.

CART models aim to split data into homogeneous subsets - the basic prediction that you could conclude from your CART model is to therefore just take the majority in each subset. New observations would fall into a subset based on your existing CART model and you could just predict the existing majority in that subset. The Tree helps to make CART models very interpretable - you start at the top of the tree and traverse down, moving to the left for positive responses and moving to the right for negative responses. As the name suggests, CART models allow you to predict continuous outcomes or categorical outcomes. In this article, we will be using a CART model to help us classify tweets as being negative or not-negative about an airline i.e. a classification problem.

Random Forests

Random Forests are designed to further improve the accuracy of CART models by building mutliple CART trees i.e. a Forest. The disadvantage of using Random Forests is that your models become less easy to understand and less interpretable, but they may improve the accuracy of your predictions. To make a prediction for a new observation, each Tree in the Forest votes on the outcome and you can simply pick the outcome that receives the most votes. To build a Random Forest, each constituent Tree is forced to split on only a random subset of the available independent variables from a bootstrapped sample of the data. For example, the training data for each tree is selected randomly with replacement - since each tree uses different independent variables and different training data, we generate a Forest of different trees. The following image provides an overview of the basic principle behind Random Forests.

Classification Matrix

A Classification (or Confusion) Matrix can help us to quantify the accuracy of our predictive models. A Classification Matrix is built by comparing the predicted values from the model (columns) against the actual values (rows), as follows:

Each cell in the Matrix then provides the number of observations that fall into that category. The TRUE Negatives and TRUE Positives are the number of observations that the model predicts correctly. The FALSE Negatives and FALSE Positives are the number of observations that the model predicts incorrectly. This therefore gives rise to a number of quantifiable measures that allow us to state how accurate the model is in comparison to other models:

  • Sensitivity or True Positive Rate = TP / (TP + FN) measures the percentage of positive outcomes that were predicted correctly by the model.
  • Specificity or True Negative Rate = TN / (TN + FP) measures the percentage of negative outcomes that were predicted correctly by the model.
  • Overall Accuracy = (TN + TP) / N measures the overall accuracy of the model given N observations.
  • Overall Error Rate = (FP + FN) / N measures the overall error rate of the model given N observations.


Natural Language Processing (NLP)

Tweets are obviously textual in nature. This means that they do not conform to any pre-defined structure and may include poor spelling, slang and other forms of non-traditional grammar. Moreover, hundreds of millions of tweets are generated every day, making it impossible for a human to analyse in any great number. Natural Language Processing (NLP) is the field of using computers to understand text - to understand human language and to derive meaning from it. In the models that we will build, the words themselves will be the variables that we use in our CART model to classify the sentiment behind our tweets!

Common Terminology

Here is some common terminology that will appear frequently in my explanations for your reference:

  • Document - A Document is a defined container of words. A Document could be a single record, article or tweet.
  • Corpus - A Corpus is a collection of Documents.
  • Term Frequency - The number of times that a term appears in a document.
  • Inverse Document Frequency - A measure of whether a term is common or rare in a corpus. For example the term the may appear numerous times in a given corpus. Using just Term Frequency, this term could be given a weighting proportional to just its frequency, whilst other more meaningful terms would not necessarily be given more weight. Inverse Document Frequency offsets the simple frequency of a term by taking into account the wider corpus.


Pre-Processing Data

One of the basic features in NLP is the idea of pre-processing the textual data before building a predictive model in an attempt to standardise the text from different sources as much as possible. Common pre-processing techniques include:

  • Common Casing - Convert all the text into either lower-case or upper-case.
  • Non-meaningful punctuation - A basic approach is to remove all non-alphanumeric characters. But be careful as some non-alphanumeric characters are meaningful, such as @ and # in tweets. Therefore, the set of non-alphanumeric characters removed should be defined dependent on the question that you are trying to answer.
  • Stop Words - Words such as I, this, there etc. are used to stucture a sentence to convey its meaning, but are not necessarily helpful in determining the underlying sentiment. These Stop Words are commonly removed from text with the added benefit of reducing the size of the dataset.
  • Stemming - This is the process of reducing common words to a common stem. For example, the words connection, connections, connective, connected and connecting can all be reduced to their common stem of connect without removing the underlying meaning. That is not to say that Stemming is without error - in fact, Stemming algorithms are liable to make mistakes. However for the purposes of building a predictive model and further reducing the size of the dataset, it is a valuable technique.
  • Lemmatisation - Whilst Stemming quickly reduces words to a base form, it does not take into account context and can therefore not differentiate between words which have different meanings dependent on its position within a sentence or context. Lemmatisation does not crudely reduce words purely based on a common stem but instead aims to remove inflectional endings only in order to return a dictionary form of a word, called the Lemma. For example, the words am, is, being and was can be reduced to the lemma be, whilst a Stemmer could not infer this contextual meaning. Whilst Lemmatisation can be used to preserve context and meaning to a better extent, it comes as the cost of additional complexity and time.


Bag of Words

After we have pre-processed our data, how do we go about identifying the independent variables to use for our predictive model from just text? Well, one approach is called Bag of Words. In this approach, you simply count the number of times each word appears in the text (i.e. the frequency of that word), resulting in one feature for each word which can be used as a baseline. Each word-frequency pairing then becomes an independent variable that we can use in our model.

Term Frequency-Inverse Document Frequency (TF-IDF)

Rather than just simply counting the frequency of each word, Term Frequency-Inverse Document Frequency (TF-IDF) aims to provide an indication of how important a word is taking into account how often the word appears in the entire dataset. The TF-IDF metric increases proportionally to the number of times that a word appears in a document, offset by the frequency of the word in the entire dataset.

Feature Vector

By transforming text into a series of word-frequency pairings (as with the Bag of Words approach) or a series of word-importance pairings (as with the TF-IDF approach), we are essentially transforming text into a numeric vector called a Feature Vector. This is important because it is these Feature Vectors that are then passed to machine learning algorithms to train and test supervised predictive models.

Apache Spark Machine Learning Pipeline

Before we begin to build our predictive models, I want to explain some of the core components in Apache Spark's Machine Learning Library that allow us to build predictive models.

  • Transformer - A Transformer in Spark essentially takes a DataFrame as its input and generates a new DataFrame, usually by appending new columns to the original DataFrame as a result of read and map operations. For example, in a learning model, the input DataFrame may contain a column containing the Feature Vectors. The Transformer may then read this column and predict the outcome (label) for each Feature Vector, generating a new column containing the predicted labels.
  • Estimator - An Estimator in Spark is essentially a learning algorithm that trains on data producing a Model - where the Model is a Transformer. For example the Spark Estimator LogisticRegression may be called to train on a training dataset, generating a LogisticRegressionModel which is the resultant Model and Transformer.
  • Pipeline - A Pipeline in Spark is an ordered series of stages where each stage is either a Transformer or an Estimator.


A typical Training Pipeline looks as follows:



In the example above, the raw text is split up to generate a sequence of words per document. Feature Vectors are created from these sequences using some sort of Term Frequency algorithm which are then passed to an Estimator to train a model.

The resultant Testing Pipeline may look as follows:



Here, new unseen raw text is again split up to generate a sequence of words per document and follows the same processing steps. But this time, the Estimator Model that was generated above is applied to the new Feature Vectors to generate predictions. It is by using these Pipeline and Pipeline Models that we can build training models and then apply the same processes to our stream of real-time event based data!

Apache Maven

Now that we have covered some of the basic concepts, let us start coding! As in previous articles, we are going to use Apache Maven to handle our build dependencies. A couple of things to note in my POM are:

  • Apache Spark Provided Dependencies - I have marked the Apache Spark dependencies as provided since they are included in Apache Spark instances.
  • Stanford NLP - I will be using the Stanford NLP Library to perform lemmatization on text. This library is written in Java and so can be used in both Java and Scala Spark Applications (I will be writing our Machine Learning Pipeline in Scala).



<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <!-- Project Information -->
    <groupId>io.keisan.knowledgebase.spark.mllib</groupId>
    <artifactId>keisan-spark-mllib</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <!-- Project Information -->
    <name>Example Spark Machine Learning Pipelines</name>
    <description>Keisan Knowledgebase Spark MLlib Project - Example Spark Machine Learning Pipelines</description>
    <url>https://www.keisan.io/knowledgebase/real-time-ml-pipeline-spark</url>
    <organization>
        <name>Keisan Ltd</name>
        <url>https://www.keisan.io</url>
    </organization>
    <developers>
        <developer>
            <id>jillur.quddus</id>
            <name>Jillur Quddus</name>
            <email>contactus@keisan.io</email>
            <url>https://www.keisan.io</url>
            <organization>Keisan Ltd</organization>
            <organizationUrl>https://www.keisan.io</organizationUrl>
            <roles>
                <role>Lead Engineer</role>
                <role>Data Scientist</role>
            </roles>
            <timezone>Europe/London</timezone>
        </developer>
    </developers>

    <!-- Properties -->
    <properties>
        <apache.spark.core.2.11.version>2.1.0</apache.spark.core.2.11.version>
        <apache.spark.mllib.2.11.version>2.1.0</apache.spark.mllib.2.11.version>
        <apache.spark.sql.2.11.version>2.1.0</apache.spark.sql.2.11.version>
        <jdk.version>1.8</jdk.version>
        <maven.plugins.maven-assembly-plugin.version>3.0.0</maven.plugins.maven-assembly-plugin.version>
        <maven.plugins.maven-compiler-plugin.version>3.6.1</maven.plugins.maven-compiler-plugin.version>
        <output.directory>/keisan/knowledgebase/spark/mllib/jars</output.directory>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.library.version>2.11.8</scala.library.version>
        <scala.maven.plugin.version>3.2.2</scala.maven.plugin.version>
        <stanford.corenlp.version>3.7.0</stanford.corenlp.version>
    </properties>

    <!-- Dependencies -->
    <dependencies>

        <!-- Apache Spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${apache.spark.core.2.11.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${apache.spark.sql.2.11.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${apache.spark.mllib.2.11.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.library.version}</version>
        </dependency>

        <!-- Stanford NLP -->
        <dependency>
            <groupId>edu.stanford.nlp</groupId>
            <artifactId>stanford-corenlp</artifactId>
            <version>${stanford.corenlp.version}</version>
        </dependency>
        <dependency>
            <groupId>edu.stanford.nlp</groupId>
            <artifactId>stanford-corenlp</artifactId>
            <version>${stanford.corenlp.version}</version>
            <classifier>models</classifier>
        </dependency>

    </dependencies>

    <!-- Project Builder -->
    <build>

        <!-- Plugins -->
        <plugins>

            <!-- Maven Compiler: Compile the Sources of the Project -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven.plugins.maven-compiler-plugin.version}</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>

            <!-- Maven Assembly: Aggregate project output with its dependencies -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>${maven.plugins.maven-assembly-plugin.version}</version>
                <configuration>

                    <!-- Final JAR Filename -->
                    <finalName>keisan-spark-mllib-${project.version}</finalName>
                    <appendAssemblyId>false</appendAssemblyId>

                    <!-- Include all Project Dependencies -->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>

                    <!-- JAR with dependencies Output Target Directory -->
                    <outputDirectory>${output.directory}</outputDirectory>

                </configuration>
                <executions>

                    <!-- Bind the assembly to the package phase -->
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>

                </executions>

            </plugin>

            <!-- Scala Maven Plugin -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>${scala.maven.plugin.version}</version>
            </plugin>

        </plugins>

    </build>

</project>


Pre-Processing Pipeline

As I mentioned above, I will be writing my Spark Machine Learning Pipeline using Scala. Scala runs on a JVM therefore allowing us to use libraries written in Java in our Scala applications. Furthermore, Scala tends to be much less verbose when developing Spark applications when compared to Java and arguably easier to learn for new data scientists.

The first bit of coding that I will perform is defining a series of functions in Scala that will allow us to pre-process text found in Spark Dataframes. As described above, I will define a series of functions that will allow us to lowercase text, remove non-meaningful punctuation, remove stop words and lemmatize text (using the Stanford Core NLP Library).


package io.keisan.knowledgebase.spark.mllib;

import java.util.Properties;

import edu.stanford.nlp.ling.CoreAnnotations.LemmaAnnotation;
import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation;
import edu.stanford.nlp.ling.CoreAnnotations.TokensAnnotation;
import edu.stanford.nlp.pipeline.Annotation;
import edu.stanford.nlp.pipeline.StanfordCoreNLP;
import edu.stanford.nlp.util.CoreMap;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions._;
import org.apache.spark.sql.types._;

import org.apache.spark.ml.feature.StopWordsRemover;

import scala.collection.JavaConversions._;
import scala.collection.mutable.ArrayBuffer;

/**
 * Pre-Processor Utilities / Helper Functions
 * A collection of functions to pre-process text
 *
 * @author jillur.quddus
 * @version 0.0.1
 */

object PreProcessorUtils {

  /**
   * Lowercase and Remove Punctuation
   * Lowercase and remove non-alphanumeric-space characters from the text field
   *
   * @param corpus The collection of documents as a Dataframe
   * @param textColumn The name of the column containing the text to be pre-processed
   * @return A Dataframe with the text lowercased and non-alphanumeric-space characters removed
   *
   */

  def lowercaseRemovePunctuation(corpus:Dataset[Row], textColumn:String): Dataset[Row] = {
    return corpus.withColumn(textColumn, regexp_replace(lower(corpus(textColumn)), "[^a-zA-Z0-9 ]", ""));
  }

  /**
   * Text Lemmatizer
   * Given a text string, generate a sequence of Lemmas
   *
   * @param text The text string to lemmatize
   * @param pipeline The Stanford Core NLP Pipeline
   * @return A sequence of lemmas
   *
   */

  def lemmatizeText(text: String, pipeline:StanfordCoreNLP): Seq[String] = {

    val doc = new Annotation(text);
    pipeline.annotate(doc);
    val lemmas = new ArrayBuffer[String]();
    val sentences = doc.get(classOf[SentencesAnnotation]);
    for (sentence <- sentences; token <- sentence.get(classOf[TokensAnnotation])) {
      val lemma = token.get(classOf[LemmaAnnotation])
      if (lemma.length > 2 && isOnlyLetters(lemma)) {
        lemmas += lemma.toLowerCase
      }
    }

    return lemmas;

  }

  /**
   * Check that a given String is made up entirely of alpha characters
   *
   * @param str The string to test for alpha characters
   * @return Boolean
   */

  def isOnlyLetters(str: String) = str.forall(c => Character.isLetter(c));

  /**
   * Stop Words Remover
   * Remove Stop Words from a given input column containing a sequence of String
   *
   * @param corpus The collection of documents as a Dataframe
   * @param inputColumn The name of the column containing a sequence of Strings to filter
   * @param outputColumn The name of the column to output the filtered sequence of Strings to
   */

  def stopWordRemover(corpus:Dataset[Row], inputColumn:String, outputColumn:String): Dataset[Row] = {

    val stopWordsRemover = new StopWordsRemover()
                             .setInputCol(inputColumn)
                             .setOutputCol(outputColumn);
    return stopWordsRemover.transform(corpus);

  }

}


Ingesting, Labelling and Pre-Processing the Data

We can now move onto ingesting the data that we downloaded from CrowdFlower. The data that we downloaded already contains a Label column (i.e. the outcome), namely airline_sentiment that is either positive, negative or neutral. We could leave the label as is i.e. a multi-classification. However, we are going to make it a bit simpler by labelling the data with a binary classification - either negative or not-negative. Our model will then be able to predict whether tweets are simply negative about the airline in question or not. To do this, in my code below I have created a new String column called negative_sentiment_label that is either true for negative-sentiment tweets (our Positive outcome) or false in all other cases (our Negative outcome where the tweet has been classified as either netural or positive in sentiment). After we have labelled our data, we can apply the pre-processing techniques that we defined above to remove non-meaningul punctuation, lemmatize our text and remove stop-words.


object TrainDecisionTreeClassifier {

  def main(args: Array[String]) = {

    /********************************************************************
     * SPARK CONTEXT
     ********************************************************************/

    // Create the Spark Context
    val conf = new SparkConf()
      .setAppName("Sentiment Models")
            .setMaster("spark://<Spark Master Hostname>:7077");
    val sc = new SparkContext(conf);
    val sparkSession = SparkSession.builder().getOrCreate();
    import sparkSession.implicits._;

    /********************************************************************
     * INGEST THE CORPUS
     ********************************************************************/

    // Define the CSV Dataset Schema
    val schema = new StructType(Array(
                    StructField("unit_id", LongType, true),
                    StructField("golden", BooleanType, true),
                    StructField("unit_state", StringType, true),
                    StructField("trusted_judgements", IntegerType, true),
                    StructField("last_judgement_at", StringType, true),
                    StructField("airline_sentiment", StringType, true),
                    StructField("airline_sentiment_confidence", DoubleType, true),
                    StructField("negative_reason", StringType, true),
                    StructField("negative_reason_confidence", DoubleType, true),
                    StructField("airline", StringType, true),
                    StructField("airline_sentiment_gold", StringType, true),
                    StructField("name", StringType, true),
                    StructField("negative_reason_gold", StringType, true),
                    StructField("retweet_count", IntegerType, true),
                    StructField("text", StringType, true),
                    StructField("tweet_coordinates", StringType, true),
                    StructField("tweet_created", StringType, true),
                    StructField("tweet_id", StringType, true),
                    StructField("tweet_location", StringType, true),
                    StructField("user_timezone", StringType, true)
                 ));

    // Read the CSV Dataset, keeping only those columns that we need to build our model
    var tweetsDF = SparkSession.builder().getOrCreate().read
                      .format("csv")
                      .option("header", true)
                      .option("delimiter", ",")
                      .option("mode", "DROPMALFORMED")
                      .schema(schema)
                      .load("hdfs://<Namenode Hostname>:9000/keisan/knowledgebase/spark/mllib/raw/Airline-Sentiment-2-w-AA.csv")
                      .select("airline_sentiment", "text");

    /********************************************************************
     * LABEL THE DATA
     ********************************************************************/

    // We are interested in detecting tweets with negative sentiment. Let us create a new column whereby
    // if the sentiment is negative, this new column is TRUE (Positive Outcome), and FALSE (Negative Outcome)
    // in all other cases

    tweetsDF = tweetsDF.withColumn("negative_sentiment_label", when(tweetsDF("airline_sentiment") === "negative", lit("true")).otherwise(lit("false")))
                .select("text", "negative_sentiment_label");

    /********************************************************************
     * APPLY THE PRE-PROCESSING PIPELINE
     ********************************************************************/

    // Let us now perform some simple pre-processing including converting the text column to lowercase
    // and removing all non-alphanumeric characters

    val lowercasedDF = PreProcessorUtils.lowercaseRemovePunctuation(tweetsDF, "text");

    // Lemmatize the text to generate a sequence of Lemmas using the Stanford NLP Library
    // By using mapPartitions, we create the Stanford NLP Pipeline once per partition rather than once per RDD entry

    val lemmatizedDF = lowercasedDF.select("text", "negative_sentiment_label").rdd.mapPartitions(p => {

      // Define the NLP Lemmatizer Pipeline once per partition
      val props = new Properties();
      props.put("annotators", "tokenize, ssplit, pos, lemma");
      val pipeline = new StanfordCoreNLP(props);

      // Lemmatize the text and preserve the Negative Sentiment Label
      p.map{
        case Row(text: String, negative_sentiment_label:String) => (PreProcessorUtils.lemmatizeText(text, pipeline), negative_sentiment_label);
      };

    }).toDF("lemmas", "negative_sentiment_label");

    // Remove Stop Words from the sequence of Lemmas
    val stopWordsRemovedDF = PreProcessorUtils.stopWordRemover(lemmatizedDF, "lemmas", "filtered_lemmas")
                                .where(size(col("filtered_lemmas")) > 1);


Now if you run the action stopWordsRemovedDF.show(false), you should see a DataFrame similar to the one below:


Hopefully you should see 3 columns - the sequence of lemmas derived from the original text, the filtered sequence of lemmas with stop-words removed, and our label column which is true for negative-sentiment tweets and false for all other cases.

Feature Vectors and Training Models

We are now ready to generate our training models. The first step is to convert our filtered sequence of lemmas into Feature Vectors so that we can pass them to our Estimator to generate our training models. As described above, we will be using the TF-IDF method to generate Feature Vectors where the importance of a term to a document in a corpus is taken into account, rather than just the term frequency (TF) alone.

To make things more formal, we can say that the Term Frequency TF(t,d) is the number of times that the term t appears in the document d. We can say that the Document Frequency DF(t,D) is the number of documents in our corpus D that contain the term t. However, as I described above, simply taking the TF can over-emphasise terms that appear often but convey little meaning. We can say that Inverse Document Frequency IDF(t,D) is a numerical measure of how important a term is by taking into account how often the term appears across the corpus, and can be calculated as follows:

In this equation, |D| is the total number of documents in the corpus D. And because of the use of logarithm, if a term appears in all documents, its IDF value becomes 0. To calculate the eventual TF-IDF measure, you can multiply TF by IDF as follows:

In Spark's MLlib library, the Term Frequency TF vectors can be generated using the Transformer HashingTF that takes a sequence of terms and coverts them into fixed-length Feature Vectors. These Feature Vectors can then be passed to the IDFModel, which is produced by the IDF Estimator that is fit onto a dataset, to scale columns based on their frequency across the corpus.

Once we have generated the scaled Feature Vectors from the sequence of lemmas, we can pass them to Estimators to build our training models. In this article, we are going to build a training model based on the Decision Tree Classifier Estimator. In my code below, I have defined a new Scala object to hold our Model Helper Functions. The first function will generate the scaled Feature Vectors from a sequence of (filtered) lemmas. The second function will train and output a Decision Tree Classifier using these scaled Feature Vectors and the previously defined Labels. The third function will train and output a Random Forest Classifier, again using these scaled Feature Vectors and the previously defined Labels. And the fourth function will allow us to generate metrics such as our model accuracy from applying our training models to test data.


package io.keisan.knowledgebase.spark.mllib;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.functions._;
import org.apache.spark.sql.types._;
import org.apache.spark.sql.Row;

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.classification.DecisionTreeClassifier;
import org.apache.spark.ml.classification.RandomForestClassifier;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.mllib.evaluation.MulticlassMetrics;

import scala.collection.JavaConversions._;

/**
 * Predictive Model Utilities / Helper Functions
 * A collection of functions to build Predictive Models
 *
 * @author jillur.quddus
 * @version 0.0.1
 */

object ModelUtils {

  // Size of the fixed-length Feature Vectors
  val numFeatures = 4096;

  // Number of trees in our random forests
  val numTrees = 256;

  /**
   * Term Frequency-Inverse Document Frequency (TF-IDF)
   * Generate Term Frequency Feature Vectors by passing the sequence of lemmas to the HashingTF Transformer.
   * Fit the IDF Estimator to the Featurized Dataset to generate the IDFModel.
   * Pass the TF Feature Vectors to the IDFModel to scale based on frequency across the corpus
   *
   * @param corpus Dataset containing the sequence of lemmas
   * @param inputColumn The name of the column containing the sequence of (filtered) lemmas
   * @param outputColumn The name of the column to store the Scaled Feature Vectors
   * @return A DataFrame with the Scaled Feature Vectors
   *
   */

  def tfidf(corpus:Dataset[Row], inputColumn:String, outputColumn:String): Dataset[Row] = {

    // Convert the sequence of Lemmas into fixed-length feature vectors using the HashingTF Transformer
    val hashingTF = new HashingTF()
                      .setInputCol(inputColumn)
                      .setOutputCol("raw_features")
                      .setNumFeatures(numFeatures);
    val featurizedData = hashingTF.transform(corpus);

    // Takes the feature vectors and scale each column based on how frequently it appears in the corpus
    val idf = new IDF().setInputCol("raw_features").setOutputCol(outputColumn);
    val idfModel = idf.fit(featurizedData);
    return idfModel.transform(featurizedData);

  }

  /**
   * Build a Decision Tree Classifier
   * Train a Decision Tree Model by supplying the training dataset that includes the label and feature vector columns
   *
   * @param featuresDF The full DataFrame containing the labels and feature vectors
   * @param trainingDF The training split DataFrame to be used to train the Model
   * @param labelColumn The name of the column containing the labels
   * @param featuresColumn The name of the column containing the scaled feature vectors
   * @return The PipelineModel containing our trained decision tree model
   *
   */

  def trainDecisionTreeModel(featurizedDF:Dataset[Row], trainingDF:Dataset[Row], labelColumn:String,
      featuresColumn:String): PipelineModel = {

    // Index the Labels
    val labelIndexer = new StringIndexer()
                          .setInputCol(labelColumn)
                          .setOutputCol("indexed_label")
                          .fit(featurizedDF);

    // Define the Decision Tree Model
    val decisionTreeModel = new DecisionTreeClassifier()
                              .setLabelCol("indexed_label")
                              .setFeaturesCol(featuresColumn);

    // Convert the Indexed Labels back to the original Labels based on the trained predictions
    val labelConverter = new IndexToString()
                            .setInputCol("prediction")
                            .setOutputCol("predicted_label")
                            .setLabels(labelIndexer.labels);

    // Chain the Indexers and Decision Tree Model to form a Pipeline
    val pipeline = new Pipeline()
                    .setStages(Array(labelIndexer, decisionTreeModel, labelConverter));

    // Run the Indexers and Train the Model on the Training Data
    return pipeline.fit(trainingDF);

  }

  /**
   * Build a Random Forest Classifier
   * Train a Random Forest Model by supplying the training dataset that includes the label and feature vector columns
   *
   * @param featuresDF The full DataFrame containing the labels and feature vectors
   * @param trainingDF The training split DataFrame to be used to train the Model
   * @param labelColumn The name of the column containing the labels
   * @param featuresColumn The name of the column containing the scaled feature vectors
   * @return The PipelineModel containing our trained random forest model
   *
   */

  def trainRandomForestModel(featurizedDF:Dataset[Row], trainingDF:Dataset[Row], labelColumn:String,
      featuresColumn:String): PipelineModel = {

    // Index the Labels
    val labelIndexer = new StringIndexer()
                          .setInputCol(labelColumn)
                          .setOutputCol("indexed_label")
                          .fit(featurizedDF);

    // Define a Random Forest model
    val randomForestModel = new RandomForestClassifier()
                              .setLabelCol("indexed_label")
                              .setFeaturesCol(featuresColumn)
                              .setNumTrees(numTrees);

    // Convert the Indexed Labels back to the original Labels based on the trained predictions
    val labelConverter = new IndexToString()
                            .setInputCol("prediction")
                            .setOutputCol("predicted_label")
                            .setLabels(labelIndexer.labels);

    // Chain the Indexers and Random Forest Model to form a Pipeline
    val pipeline = new Pipeline()
                      .setStages(Array(labelIndexer, randomForestModel, labelConverter));

    // Run the Indexers and Train the Model on the Training Data
    return pipeline.fit(trainingDF);

  }

  /**
   * Generate Multi-class Metrics
   * Generate multi-class metrics given a predictions dataframe containing prediction and indexed label double columns.
   * Such metrics allow us to generate classification matrices, false and true positive rates etc.
   *
   * @param predictionsDF A DataFrame containing predictions and indexed labels
   * @param predictionColumn The name of the column containing the predictions [Double]
   * @param indexedLabelColumn The name of the column containing the indexed labels [Double]
   * @return A MulticlassMetrics object that can be used to output model metrics
   *
   */

  def generateMulticlassMetrics(predictionsDF:Dataset[Row], predictionColumn:String, indexedLabelColumn:String): MulticlassMetrics = {

    val predictionAndLabels = predictionsDF.select(predictionColumn, indexedLabelColumn).rdd.map{
      case Row(predictionColumn: Double, indexedLabelColumn:Double) => (predictionColumn, indexedLabelColumn);
    };
    return new MulticlassMetrics(predictionAndLabels);

  }

}


Train a Decision Tree Classifier

Now let us return to our main application and finish it off by performing the following operations:

  • Generate scaled Feature Vectors from our sequences of filtered Lemmas
  • Randomly split our corpus into training (70%) and test (30%) datasets respectively
  • Train a Decision Tree Classifier Estimator on the training dataset
  • Apply the resulting Decision Tree Classifier Model on the test dataset allowing us to generate metrics regarding the accuracy of our Model
  • Save our Decision Tree Classifier Model to the HDFS for using later on in our real-time event-based streaming data pipeline


package io.keisan.knowledgebase.spark.mllib;

import edu.stanford.nlp.pipeline.StanfordCoreNLP;

import java.util.Properties;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.functions._;
import org.apache.spark.sql.types._;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession.Builder;

import scala.collection.JavaConversions._;

/**
 * Tweet Sentiment Decision Tree Classifier
 * Train a Decision Tree Classifier on a collection of pre-labelled tweets about airlines
 *
 * @author jillur.quddus
 * @version 0.0.1
 */

object TrainDecisionTreeClassifier {

  def main(args: Array[String]) = {

    /********************************************************************
     * SPARK CONTEXT
     ********************************************************************/

    // Create the Spark Context
    val conf = new SparkConf()
      .setAppName("Sentiment Models")
      .setMaster("spark://<Spark Master Hostname>:7077");

    val sc = new SparkContext(conf);
    val sparkSession = SparkSession.builder().getOrCreate();
    import sparkSession.implicits._;

    /********************************************************************
     * INGEST THE CORPUS
     ********************************************************************/

    // Define the CSV Dataset Schema
    val schema = new StructType(Array(
                    StructField("unit_id", LongType, true),
                    StructField("golden", BooleanType, true),
                    StructField("unit_state", StringType, true),
                    StructField("trusted_judgements", IntegerType, true),
                    StructField("last_judgement_at", StringType, true),
                    StructField("airline_sentiment", StringType, true),
                    StructField("airline_sentiment_confidence", DoubleType, true),
                    StructField("negative_reason", StringType, true),
                    StructField("negative_reason_confidence", DoubleType, true),
                    StructField("airline", StringType, true),
                    StructField("airline_sentiment_gold", StringType, true),
                    StructField("name", StringType, true),
                    StructField("negative_reason_gold", StringType, true),
                    StructField("retweet_count", IntegerType, true),
                    StructField("text", StringType, true),
                    StructField("tweet_coordinates", StringType, true),
                    StructField("tweet_created", StringType, true),
                    StructField("tweet_id", StringType, true),
                    StructField("tweet_location", StringType, true),
                    StructField("user_timezone", StringType, true)
                 ));

    // Read the CSV Dataset, keeping only those columns that we need to build our model
    var tweetsDF = SparkSession.builder().getOrCreate().read
                      .format("csv")
                      .option("header", true)
                      .option("delimiter", ",")
                      .option("mode", "DROPMALFORMED")
                      .schema(schema)
                      .load("hdfs://<Namenode Hostname>:9000/keisan/knowledgebase/spark/mllib/raw/Airline-Sentiment-2-w-AA.csv")
                      .select("airline_sentiment", "text");

    /********************************************************************
     * LABEL THE DATA
     ********************************************************************/

    // We are interested in detecting tweets with negative sentiment. Let us create a new column whereby
    // if the sentiment is negative, this new column is TRUE (Positive Outcome), and FALSE (Negative Outcome)
    // in all other cases

    tweetsDF = tweetsDF.withColumn("negative_sentiment_label", when(tweetsDF("airline_sentiment") === "negative", lit("true")).otherwise(lit("false")))
                .select("text", "negative_sentiment_label");

    /********************************************************************
     * APPLY THE PRE-PROCESSING PIPELINE
     ********************************************************************/

    // Let us now perform some simple pre-processing including converting the text column to lowercase
    // and removing all non-alphanumeric characters

    val lowercasedDF = PreProcessorUtils.lowercaseRemovePunctuation(tweetsDF, "text");

    // Lemmatize the text to generate a sequence of Lemmas using the Stanford NLP Library
    // By using mapPartitions, we create the Stanford NLP Pipeline once per partition rather than once per RDD entry

    val lemmatizedDF = lowercasedDF.select("text", "negative_sentiment_label").rdd.mapPartitions(p => {

      // Define the NLP Lemmatizer Pipeline once per partition
      val props = new Properties();
      props.put("annotators", "tokenize, ssplit, pos, lemma");
      val pipeline = new StanfordCoreNLP(props);

      // Lemmatize the text and preserve the Negative Sentiment Label
      p.map{
        case Row(text: String, negative_sentiment_label:String) => (PreProcessorUtils.lemmatizeText(text, pipeline), negative_sentiment_label);
      };

    }).toDF("lemmas", "negative_sentiment_label");

    // Remove Stop Words from the sequence of Lemmas
    val stopWordsRemovedDF = PreProcessorUtils.stopWordRemover(lemmatizedDF, "lemmas", "filtered_lemmas")
                                .where(size(col("filtered_lemmas")) > 1);

    /********************************************************************
     * SCALED FEATURE VECTOR
     ********************************************************************/

    // Generate the Scaled Feature Vectors
    val featurizedDF = ModelUtils.tfidf(stopWordsRemovedDF, "filtered_lemmas", "features");

    /********************************************************************
     * TRAIN AND EVALUATE A DECISION TREE CLASSIFIER
     ********************************************************************/

    // Split the data into Training and Test Datasets
    val Array(trainingDF, testDF) = featurizedDF.randomSplit(Array(0.7, 0.3))

    // Train a Decision Tree Model using the Training Dataset
    val decisionTreeModel = ModelUtils.trainDecisionTreeModel(featurizedDF, trainingDF, "negative_sentiment_label", "features");

    // Apply the Decision Tree Training Model to the Test Dataset
    val decisionTreePredictions = decisionTreeModel.transform(testDF);
    decisionTreePredictions.select("negative_sentiment_label", "predicted_label", "filtered_lemmas", "features").show(false);

    // Compute the accuracy of the Decision Tree Training Model on the Test Dataset
    val decisionTreeEvaluator = new MulticlassClassificationEvaluator()
                                  .setLabelCol("indexed_label")
                                  .setPredictionCol("prediction")
                                  .setMetricName("accuracy");
    val decisionTreeAccuracy = decisionTreeEvaluator.evaluate(decisionTreePredictions);
    println("Decision Tree Test Accuracy Rate = " + decisionTreeAccuracy);
    println("Decision Tree Test Error Rate = " + (1.0 - decisionTreeAccuracy));

    // Generate a Classification Matrix
    val metrics = ModelUtils.generateMulticlassMetrics(decisionTreePredictions, "prediction", "indexed_label");
    println(metrics.confusionMatrix);

    // Generate Label Accuracy Metrics
    val labelMetrics = metrics.labels;
    labelMetrics.foreach { l =>
      println(s"False Positive Rate ($l) = " + metrics.falsePositiveRate(l));
    }

    /********************************************************************
     * SAVE THE DECISION TREE CLASSIFIER FOR REAL-TIME STREAMING
     ********************************************************************/

    decisionTreeModel.save("hdfs://<Namenode Hostname>:9000/keisan/knowledgebase/spark/mllib/models/decisionTreeClassifier");

  }

}


Multi-class Metrics

The application above will print some metrics regarding the accuracy of our trained Decision Tree Classifier model on the test dataset. The exact numbers may be slightly different to yours since we are randomly splitting the corpus, but generally you should see the following results:

  • Test Accuracy Rate: 71%
  • Test Error Rate: 29%,


and the following Classification Matrix:

2285.0 293.0
945.0 681.0


This Classification Matrix tells us that out of 4,204 test observations, the trained Decision Tree Classifier model gets 2,285 predictions correct whereby it predicts negative sentiment and the test tweet actually is negative in sentiment (True Positives). The model gets a further 681 predictions correct whereby it predicts positive sentiment and the test tweet actually is positive in sentiment (True Negatives). The model therefore gets 2,966 predictions correct out of a total of 4,204 test observations - an accuracy rate of 71%.

However, by following the opposite diagonal in the Classification Matrix, the model gets 293 predictions incorrect whereby it predicts positive sentiment but the test tweet is actually negative in sentiment (False Negatives). The model gets a further 945 predictions incorrect whereby it predicts negative sentiment but the test tweet is actually positive in sentiment (False Positives). The model therefore gets 1,238 predictions incorrect out of a total of 4,204 test observations - an error rate of 29%.

So our Decision Tree Classifier correctly predicts the sentiment of a tweet about 71% of the time. We will now apply our trained model to our real-time event-based streaming data pipeline to try to predict the sentiment of airline tweets as they are created!

Apache Kafka Producer

In my last article, we updated our data-pipeline to use an Apache Kafka Producer to collect tweets using Twitter's Hosebird Client within a Twitter4j wrapper where the tweets were serialised using the Avro library and Bijection. If you run that Producer, you should observe tweets being published to the Kafka Topic.

Apache Spark Consumer Streaming Application

We are now going to create an Apache Spark Consumer Streaming Application that will consume the Avro tweets from the Kafka Topic. We are then going to extend our high-level real-time data processing pipeline by passing those tweets through the same pre-processing pipeline as above before applying our trained Decision Tree Classifier on them to try and classify their underlying sentiment in near real-time!


I will be writing our Spark Consumer Streaming Application in the same project that we trained our Decision Tree Classifier above. Alternatively, you could create a new project for the Consumer Streaming Application - it's up to you. As I will be using the same project, I need to update my Maven POM to include some additional dependencies - namely Apache Avro, Apache Spark Streaming (provided), Apache Kafka (note that I will be using version 0.8.2.2 which is compatible with Spark 2.1.0 Streaming), Bijection and my Kafka Twitter Producer application from my last article (where I defined the Avro Tweet Schema that I will need again when consuming from the Kafka Topic and parsing the message):



<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <!-- Project Information -->
    <groupId>io.keisan.knowledgebase.spark.mllib</groupId>
    <artifactId>keisan-spark-mllib</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <!-- Project Information -->
    <name>Example Spark Predictive Models</name>
    <description>Keisan Knowledgebase Spark MLlib Project - Example Spark Machine Learning Pipelines</description>
    <url>https://www.keisan.io/knowledgebase/real-time-ml-pipeline-spark</url>
    <organization>
        <name>Keisan Ltd</name>
        <url>https://www.keisan.io</url>
    </organization>
    <developers>
        <developer>
            <id>jillur.quddus</id>
            <name>Jillur Quddus</name>
            <email>contactus@keisan.io</email>
            <url>https://www.keisan.io</url>
            <organization>Keisan Ltd</organization>
            <organizationUrl>https://www.keisan.io</organizationUrl>
            <roles>
                <role>Lead Engineer</role>
                <role>Data Scientist</role>
            </roles>
            <timezone>Europe/London</timezone>
        </developer>
    </developers>

    <!-- Properties -->
    <properties>
        <apache.avro.version>1.8.1</apache.avro.version>
        <apache.kafka.2.11.version>0.8.2.2</apache.kafka.2.11.version>
        <apache.spark.core.2.11.version>2.1.0</apache.spark.core.2.11.version>
        <apache.spark.mllib.2.11.version>2.1.0</apache.spark.mllib.2.11.version>
        <apache.spark.sql.2.11.version>2.1.0</apache.spark.sql.2.11.version>
        <apache.spark.streaming.2.11.version>2.1.0</apache.spark.streaming.2.11.version>
        <apache.spark.streaming.kafka-0-8_2.11.version>2.1.0</apache.spark.streaming.kafka-0-8_2.11.version>
        <bijection.avro.2.11.version>0.9.5</bijection.avro.2.11.version>
        <jdk.version>1.8</jdk.version>
        <keisan.kafka.producers.version>0.0.1-SNAPSHOT</keisan.kafka.producers.version>
        <maven.plugins.maven-assembly-plugin.version>3.0.0</maven.plugins.maven-assembly-plugin.version>
        <maven.plugins.maven-compiler-plugin.version>3.6.1</maven.plugins.maven-compiler-plugin.version>
        <output.directory>/keisan/knowledgebase/spark/mllib/jars</output.directory>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.library.version>2.11.8</scala.library.version>
        <scala.maven.plugin.version>3.2.2</scala.maven.plugin.version>
        <stanford.corenlp.version>3.7.0</stanford.corenlp.version>
    </properties>

    <!-- Dependencies -->
    <dependencies>

        <!-- Apache Avro -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${apache.avro.version}</version>
        </dependency>

        <!-- Apache Kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${apache.kafka.2.11.version}</version>
        </dependency>

        <!-- Apache Spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${apache.spark.core.2.11.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${apache.spark.mllib.2.11.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${apache.spark.sql.2.11.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${apache.spark.streaming.2.11.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>${apache.spark.streaming.kafka-0-8_2.11.version}</version>
        </dependency>

        <!-- Bijection -->
        <dependency>
            <groupId>com.twitter</groupId>
            <artifactId>bijection-avro_2.11</artifactId>
            <version>${bijection.avro.2.11.version}</version>
        </dependency>

        <!-- Keisan Kafka Producers -->
        <dependency>
            <groupId>io.keisan.knowledgebase.kafka.producers</groupId>
            <artifactId>keisan-kafka-producers</artifactId>
            <version>${keisan.kafka.producers.version}</version>
        </dependency>

        <!-- Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.library.version}</version>
        </dependency>

        <!-- Stanford NLP -->
        <dependency>
            <groupId>edu.stanford.nlp</groupId>
            <artifactId>stanford-corenlp</artifactId>
            <version>${stanford.corenlp.version}</version>
        </dependency>
        <dependency>
            <groupId>edu.stanford.nlp</groupId>
            <artifactId>stanford-corenlp</artifactId>
            <version>${stanford.corenlp.version}</version>
            <classifier>models</classifier>
        </dependency>

    </dependencies>

    <!-- Project Builder -->
    <build>

        <!-- Plugins -->
        <plugins>

            <!-- Maven Compiler: Compile the Sources of the Project -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven.plugins.maven-compiler-plugin.version}</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>

            <!-- Maven Assembly: Aggregate project output with its dependencies -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>${maven.plugins.maven-assembly-plugin.version}</version>
                <configuration>

                    <!-- Final JAR Filename -->
                    <finalName>keisan-spark-mllib-${project.version}</finalName>
                    <appendAssemblyId>false</appendAssemblyId>

                    <!-- Include all Project Dependencies -->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>

                    <!-- JAR with dependencies Output Target Directory -->
                    <outputDirectory>${output.directory}</outputDirectory>

                </configuration>
                <executions>

                    <!-- Bind the assembly to the package phase -->
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>

                </executions>

            </plugin>

            <!-- Scala Maven Plugin -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>${scala.maven.plugin.version}</version>
            </plugin>

        </plugins>

    </build>

</project>


Spark Streaming Context

I will be writing the Spark Consumer Streaming Application in Scala, however the Java equivalent should be similar in structure albeit much more verbose. The first thing to do is create the Spark Streaming Context. I will be passing the normal Spark Context to one of the Spark Streaming Context Constructors to create the Streaming Context, using a batch interval of 5 seconds in my example.


  // Spark Streaming Context
  val conf = new SparkConf()
      .setAppName("Streaming Twitter Sentiment Classifier")
      .setMaster("spark://<Spark Master Hostname>:7077");
  val sc = new SparkContext(conf);
  val ssc = new StreamingContext(sc, Seconds(5));
  val sparkSession = SparkSession.builder().getOrCreate();
  import sparkSession.implicits._;


Loading the Pipeline Model

The second step is to load the trained Decision Tree Classifier Pipeline Model that we previously persisted to the HDFS. To do that is very simple:


    // Load the Trained Decision Tree Classifier
    val decisionTreeModel = PipelineModel.read.load("hdfs://<Namenode Hostname>:9000/keisan/knowledgebase/spark/mllib/models/decisionTreeClassifier");


Real-time Streaming Sentiment Analysis

We are now ready to write our main Consumer Streaming application. Our Spark Streaming application will execute the following operations:

  • Kafka Direct Stream - We will periodically query Kafka for the latest offsets in each Topic and Partition using the Default Decoder to return the raw array of bytes representing our Avro Tweet, resulting in a discretized stream.
  • Bijection - We will then begin processing the discretized stream by inverting the raw array of bytes using Bijection and the Tweet Schema defined in my Kafka Twitter Producer, resulting in a RDD of Rows which we will then convert into a DataFrame containing our parsed tweet fields, including the core text of the tweet.
  • Pre-Processing - Next, we will apply the same pre-processing pipeline to these DataFrames that we applied when we trained the Decision Tree Classifier i.e. lowercase the text, remove non-meaningful punctuation, lemmatize the text using Stanford's Core NLP library and finally removing stop-words from the resultant sequence of lemmas.
  • Scaled Feature Vectors - Next, we will generate scaled Feature Vectors from these filtered sequences of lemmas using TF-IDF.
  • Decision Tree Classifier Predictions - Finally we will pass these scaled Feature Vectors to the trained Decision Tree Classifier that we created and loaded earlier to make predictions about whether the tweet is negative or not-negative in sentiment!



package io.keisan.knowledgebase.spark.mllib;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

import edu.stanford.nlp.pipeline.StanfordCoreNLP;

import io.keisan.knowledgebase.kafka.producers.TwitterProducer;

import java.util.HashMap;
import java.util.Properties;

import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.rdd.RDD;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions._;
import org.apache.spark.sql.types._;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.collection.JavaConversions._;

/**
 * Streaming Twitter Sentiment Classifier
 * Consume and deserialise Avro Tweets from Kafka Topics. Apply the same pre-processing
 * pipeline to the stream of tweets as our Decision Tree Classifier trainer and generate
 * scaled Feature Vectors. Finally apply the trained Decision Tree Classifier on the
 * stream of tweets to predict their sentiment in near real-time.
 *
 * Usage: KafkaStreamingSentimentClassifier <brokers> <topics>
 *  brokers: List of one or more Kafka Brokers
 *  topics: List of one or more Kafka Topics to consume from
 *
 * @author jillur.quddus
 * @version 0.0.1
 */

object KafkaStreamingSentimentClassifier {

  // Spark Streaming Context
  val conf = new SparkConf()
      .setAppName("Streaming Twitter Sentiment Classifier")
      .setMaster("spark://<Spark Master Hostname>:7077");
  val sc = new SparkContext(conf);
  val ssc = new StreamingContext(sc, Seconds(5));
  val sparkSession = SparkSession.builder().getOrCreate();
  import sparkSession.implicits._;

    // Load the Trained Decision Tree Classifier
    val decisionTreeModel = PipelineModel.read.load("hdfs://<Namenode Hostname>:9000/keisan/knowledgebase/spark/mllib/models/decisionTreeClassifier");

    /**
     * Consume and deserialise Avro messages from the Kafka Topics using a Direct Stream Approach.
     * Pass the stream of tweets through the same pre-processing pipeline as the training dataset
     * and generate scaled Feature Vectors. Pass the scaled Feature Vectors to the trained
     * Decision Tree Classifier to predict the underlying sentiment.
     *
     * @param args List of Kafka Brokers and Topics from which to consume from
     */

  def main(args: Array[String]) = {

    if ( args.length < 2 ) {
            System.err.println("Usage: KafkaStreamingSentimentClassifier <brokers> <topics>");
            System.exit(1);
        }

    val Array(brokers, topics) = args;

    /********************************************************************
     * KAFKA CONSUMER DSTREAM
     ********************************************************************/

    // Specify the Kafka Broker Options and set of Topics
    val kafkaParameters = Map[String, String]("metadata.broker.list" -> brokers);
    val topicSet = topics.split(",").toSet;

    // Create an input DStream using KafkaUtils and the DefaultDecoder to provide the raw array of Bytes
    val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
        ssc, kafkaParameters, topicSet);

    /********************************************************************
     * DESERIALISE USING INVERSION, PRE-PROCESS AND CLASSIFY
     ********************************************************************/

    // Deserialise the Avro messages using Bijection and the Tweet Schema
    messages
      .map(message => {

        val parser = new Schema.Parser();
        val schema = parser.parse(TwitterProducer.TWEET_SCHEMA);
        val recordInjection = GenericAvroCodecs.toBinary[GenericRecord](schema);
        val record = recordInjection.invert(message._2).get;
        Row(record.get("id").toString(), record.get("user_name").toString(), record.get("text").toString());

      })
      .foreachRDD(rdd => {

        // Check that the RDD is not null - otherwise applying the predictive model will raise an IllegalStateException
        if (rdd != null && !rdd.isEmpty()) {

            // Convert the RDD of Rows to a DataFrame of Tweet ID, Username and Text
            val tweetsDF = sparkSession.createDataFrame(rdd,
                new StructType().add("id", StringType)
                  .add("user_name", StringType)
                  .add("text", StringType));

            /********************************************************************
             * APPLY THE SAME PRE-PROCESSING PIPELINE TO THE REAL-TIME TWEETS
             ********************************************************************/

            // Lowercase and remove punctuation
            val lowercasedDF = PreProcessorUtils.lowercaseRemovePunctuation(tweetsDF, "text");

            // Lemmatize
            val lemmatizedDF = lowercasedDF.select("user_name", "text").rdd.mapPartitions(p => {
                val props = new Properties();
                props.put("annotators", "tokenize, ssplit, pos, lemma");
                val pipeline = new StanfordCoreNLP(props);
                p.map{
                    case Row(user_name:String, text: String) => (user_name, text, PreProcessorUtils.lemmatizeText(text, pipeline));
                };
            }).toDF("user_name", "text", "lemmas");

            // Remove Stop Words from the sequence of Lemmas
            val stopWordsRemovedDF = PreProcessorUtils.stopWordRemover(lemmatizedDF, "lemmas", "filtered_lemmas")
                                        .where(size(col("filtered_lemmas")) > 1);

            /********************************************************************
             * SCALED FEATURE VECTORS
             ********************************************************************/

            // Generate the Scaled Feature Vectors
            val featurizedDF = ModelUtils.tfidf(stopWordsRemovedDF, "filtered_lemmas", "features");

            /********************************************************************
            * APPLY TRAINED DECISION TREE CLASSIFIER TO THE REAL-TIME TWEETS
             ********************************************************************/

            // Apply the Decision Tree Classifier to the scaled Feature Vectors
            // Note that the Decision Tree Classifier was trained using a column called "features".
            // A column named "features" MUST also be present on the new dataset in order to make
            // predictions. If it is called anything else, an exception will be raised.
            val predictions = decisionTreeModel.transform(featurizedDF);

            // Output the original tweet's username, text and predicted label
            predictions.select("user_name", "text", "predicted_label").show(false);

        }

      });

    // Start the computation
    ssc.start();

    // Wait for the computation to terminate
    ssc.awaitTermination();

  }

}


We can now run our end-to-end machine learning pipline! First, I am going to update my Kafka Twitter Producer slightly to filter based on common airlines here in the UK including British_Airways, Ryanair and easyJet. Assuming that my Kafka cluster is online, and that my Kafka Producer is running, I can build my Spark Consumer Streaming Application using Maven and deploy it to my Spark cluster using the following command:


# Deploy the Spark Consumer Streaming Application to the Spark Cluster
bin/spark-submit --class io.keisan.knowledgebase.spark.mllib.KafkaStreamingSentimentClassifier --master spark://<Spark Master Hostname>:6066 --deploy-mode cluster /keisan/knowledgebase/spark/mllib/jars/keisan-spark-mllib-0.0.1-SNAPSHOT.jar <Kafka Broker Hostname>:9092 twitter


Assuming that all goes well, if you check the standard output of your Spark Driver you should see Spark Dataframes being printed, containing the tweet's username, text and a predicted label - remember that in our case the true outcome is that the tweet is negative in sentiment, and the false outcome is that the tweet is positive in sentiment. Here are some tweets that have been automatically classified using our Decision Tree Classifier:

As you can see our Decision Tree Classifier isn't completely perfect but in most cases it has correctly predicted the underlying sentiment of the tweet! Airlines, and other organisations, can use predictive models like this to inform them of how they can improve their service and products in near real-time.

In this article, we have successfully developed a near real-time, high-throughput, reliable and fault-tolerant machine learning pipeline capable of making accurate predictions on event-based data as it is being streamed and received. In future articles, I will explore other types of supervised and unsupervised algorithms and how to incorporate them into real-time pipelines too.

TOP