Keisan

Real-Time Data Pipeline with Apache Kafka and Spark

Platform Engineering

Real-Time Data Pipeline with Apache Kafka and Spark

How to build a data pipeline to process event-based data in real-time

Real-Time Data Pipeline with Apache Kafka and Spark


It was in 2012 when I first heard the terms 'Hadoop' and 'Big Data'. At the time, the two words were almost synonymous with each other - I would frequently attend meetings where clients wanted a 'Big Data' solution simply because it had become the latest buzz word, with little or no consideration as to whether their requirements and data actually warranted one. 'Big Data' is of course more than just Hadoop and as scalable technologies in both batch and real-time became more mature, so did our knowledge of them. However, for those of you that are now taking your first footsteps into the world of 'Big Data' technologies like I was back in 2012, you may be asking yourselves a similar question to what I was asking - namely, just what on earth is the difference between this plethora of new technologies and when should I choose one over the other?

To name but a few that you may of heard of before stumbling onto this article (in no particular order, just as I recall them): Apache Hadoop, Apache Hive, Apache Pig, Apache HBase, Apache Spark, Apache Storm, Apache Kafka, Apache Flume, Apache Cassandra, MongoDB, Redis, Oracle Berkeley, Akka, Spray (supersed by Akka HTTP), Apache TinkerPop, Apache Giraph, Apache Mahout, Apache ZooKeeper, Couchbase, Apache Flink, Elastic Search, Elassandra, SOLR, Voldermort (yes, you read that correctly), MemcacheDB, DynamoDB.....and so forth.

Over the coming weeks and months, I hope to write about each one of these technologies in a series of Knowledge Base articles, focusing on real-world situations in which you may decide to use them and most importantly using hands-on examples. As the first article in the series, I would like to dedicate it to those of you taking your first steps into the world of big-data technologies by covering a typical real-time data processing scenario whilst introducing some very important streaming technologies, namely Apache Flume, Apache Kafka and Apache Spark.

The Goal

Our goal in this article will be to create a high-throughput, scalable, reliable and fault-tolerant data pipeline capabale of fetching event-based data and streaming those events to Apache Spark which will parse their content, all of which will be done in near real-time. (In my next article, I will discuss how we can use our real-time data pipeline to perform analytics and build predictive models using this data stream in Apache Spark!)


The 'Hello World' of Real-Time Streaming Data Flows

The examples that I describe below will use tweets as their real-time data source. Twitter is a great example to start with as it is free, it produces an abundant amount of data in real-time, and Twitter users can setup their own applications to access the stream of tweets made available by Twitter. So before continuing with this article, please make sure that you have a Twitter account. Once you have setup your Twitter account, head over to Twitter's Application Management page to create and register your application. Assuming that you have done this correctly, you will be provided with the following authentication information that you will need later on:

  • Consumer Key / API Key (e.g. A1b2C3DEfGH4IjkL5mnopq678)
  • Consumer Secret / API Secret (e.g. Zy1XW2vUtS3R4QPON5mLK6JiHG7FeD8CBaAbC9876DEf543Ghi)
  • Access Token (e.g. 1234567890-5mLK6JiHG7FeD8CBaAbC9876DEf543GA77888dH)
  • Access Token Secret (e.g. aAbC9876DEf543GA77888dHD8CBaAbC9876D5mLK6JiHG7)


Apache Flume

So let us begin with one of the relatively older technologies, Apache Flume. As stated on its website, "Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application". So what does this all mean? Well in practical terms, Flume allows us to effectively collect (ingest), aggregate and move large amounts of streaming data from multiple sources (such as the Twitter stream of tweets or log data) into Hadoop (to which Flume is tightly integrated) where we can store it using Hadoop's Distributed File System (HDFS) and from where we can analyse it. A common use case of Flume is to act as a data pipeline to ingest simple event-based data into Hadoop and HBase, but it also supports other technologies and centralised data stores out-of-the-box. Let us break it further by studying its core components:



  • Event - The data event itself that is processed by Flume, such as a tweet or an entry in a log file.
  • Source - The consumer of the event data through which data enters into Flume. For a tweet, this may be the API or connector linking your application to the Twitter stream such as org.apache.flume.source.twitter.TwitterSource. For log entries, this may be a logging framework such as log4j.
  • Channel - A passive store that acts as the conduit between the Source and a consumer of its event data (i.e. the Sink - 'sinks drain the channels'). Common channels to keep the event until it is consumed by a Flume Sink include the local file system (File Channel) or memory (Memory Channel). Event data can be stored in one or more channels before it is 'drained' by a Flume Sink.
  • Sink - The mechanism by which the event data is consumed/removed from the Channel and delivered to its destination. A common Sink is a HDFS Sink that persists the event data to the HDFS.
  • Agent - A Flume instance i.e. a collection of Sources, Channels and Sinks running in a JVM.


One of the great things about Flume is that Sinks can forward event data to the Flume Source of another Flume Agent i.e. Agents can be chained together to form complex data flows. Sources and Sinks within the same Agent run asynchronously with the events stored in the Channel. Furthermore, Flume guarantees delivery of messages from one Flume Agent to the next by starting separate transactions on both the delivering and receiving Agents respectively. Finally, Flume can be scaled horizontally with ease as there is no central co-ordinator node and Flume Agents run independently of each other with no single point of failure. All this makes Apache Flume a powerful service for high-throughput streams of real-time event-based data to feed your Big Data system.



We are now ready to configure and deploy our first Flume Agent! Our first Flume Agent will fetch tweets from the Twitter Stream via the Twitter Application that you created above using a demo Flume Twitter Source class that is bundled with Flume out-of-the-box. This demo Twitter Source connects to the Twitter Stream and continuously downloads a sample of tweets, converts them to Avro format and sends these Avro events to our Flume Sink. We will be using a Memory Channel from which a Logger Sink will consume the tweets and output them to the Console. Note that I am using a CentOS 7 Minimal Installation Server to perform the commands below, but Flume should work equally well with other Linux distributions.


# Unpack the Flume Binary to a directory of choice
tar -xzf apache-flume-1.7.0-bin.tar.gz

# Create a new Flume Configuration File to configure our Flume Twitter Agent
vi conf/flume-twitter.conf

    # The configuration file needs to define the sources,
    # the channels and the sinks.
    # Sources, channels and sinks are defined per agent,
    # in this case called 'TwitterAgent'

    # Flume Instance - Twitter Agent
    TwitterAgent.sources = Twitter
    TwitterAgent.channels = MemChannel
    TwitterAgent.sinks = Logger

    # Source Configuration - Inbuilt TwitterSource
    TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
    TwitterAgent.sources.Twitter.channels = MemChannel
    TwitterAgent.sources.Twitter.consumerKey = <Your Twitter App Consumer Key>
    TwitterAgent.sources.Twitter.consumerSecret = <Your Twitter App Consumer Secret>
    TwitterAgent.sources.Twitter.accessToken = <Your Twitter App Access Token>
    TwitterAgent.sources.Twitter.accessTokenSecret = <Your Twitter App Access Token Secret>
    TwitterAgent.sources.Twitter.keywords = <Comma Delimited List of Keywords to filter the Tweets>

    # Channel Configuration - Memory Channel
    TwitterAgent.channels.MemChannel.type = memory
    TwitterAgent.channels.MemChannel.capacity = 10000
    TwitterAgent.channels.MemChannel.transactionCapacity = 100

    # Sink Configuration - Logger Sink
    TwitterAgent.sinks.Logger.type = logger
    TwitterAgent.sinks.Logger.channel = MemChannel

# Launch the Flume Agent to sink to the Console
bin/flume-ng agent --name TwitterAgent --conf conf --conf-file conf/flume-twitter.conf -Dflume.root.logger=DEBUG,console


Assuming that all goes well, the Logger Sink should output the Tweets in near real-time to the Console in JSON formatting as follows (since it is election time here in the UK, I used election themed keywords such as ge2017, election and so forth):



Currently we have setup a Logger Sink and have directed the logger to output to the console in our launch command using -Dflume.root.logger=DEBUG,console. Inside the conf folder, you will find a log4j.properties to update the logger attributes as required. By default, it will log to the logs directory inside the Flume Home Directory and to a file called flume.log. To write to the HDFS, all we need to do is amend our Sink as follows:


# Update the Sink to write to your HDFS
vi conf/flume-twitter.conf

    # The configuration file needs to define the sources,
    # the channels and the sinks.
    # Sources, channels and sinks are defined per agent,
    # in this case called 'TwitterAgent'

    # Flume Instance - Twitter Agent
    TwitterAgent.sources = Twitter
    TwitterAgent.channels = MemChannel
    TwitterAgent.sinks = HDFS

    ...

    # Sink Configuration - HDFS Sink
    TwitterAgent.sinks.HDFS.type = hdfs
    TwitterAgent.sinks.HDFS.channel = MemChannel

    # Ensure that the owner of the Flume Agent has permission to write to this HDFS Directory
    TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:9000/user/flume/twitter/data/%Y/%m/%d/%H/
    TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
    TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
    TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
    TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
    TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

    # Because we have used %Y/%m/%d/%H formatting in our HDFS Path
    # we must specify how Flume should get the timestamp of the tweet.
    # To use the timestamp of the destination i.e. the HDFS Sink, we can use
    # TwitterAgent.sinks.HDFS.hdfs.useLocalTimeStamp = true
    # Alternatively, to use the timestamp of the actual event (tweet), we can use Source interceptors
    TwitterAgent.sources.Twitter.interceptors = interceptor1
    TwitterAgent.sources.Twitter.interceptors.interceptor1.type = timestamp

# Ensure that the Hadoop Libraries are on the Flume Classpath so that Flume knows how to access and write to your HDFS
cp conf/flume-env.sh.template conf/flume-env.sh
vi conf/flume-env.sh

    # Update the Flume Classpath to point to the correct directories on your system
    # Alternatively, copy the Hadoop Common and HDFS libraries to the the Flume lib folder
    FLUME_CLASSPATH="$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/common/lib/*:$HADOOP_HOME/share/hadoop/hdfs/*"

# Launch the Flume Agent to sink to the HDFS
bin/flume-ng agent --name TwitterAgent --conf conf --conf-file conf/flume-twitter.conf


Note that we do not need to explicitly define the Root Logger in our launch command anymore as Flume with automatically read log4j.properties and flume-env.sh from the conf directory. If all goes well, tweets will be written in near real-time to your HDFS at the path you specified above.



To learn more about Apache Flume, including creating complex chained data flows and its other Source, Channel and Sink types, please refer to the Apache Flume Documentation.


Apache Kafka

So far we have built a data pipeline to ingest simple event-based data i.e. tweets, into our system, writing to either a log file, the console or to your HDFS. Our goal is to get them to Apache Spark so that we can parse them in near real-time, and eventually build predictive models in Spark (which I will cover in the next article). The next technology that we will discuss in our real-time data pipeline is Apache Kafka.

As stated on its website "Apache Kafka is a distributed streaming platform". So why do we need it in our data pipeline? Well, Kafka is very well suited when you need to build real-time streaming data pipelines, like we are in this article, that get data between systems in reliable, scalable and fault-tolerant way. Let us explore some of the core concepts behind Apache Kafka before trying to integrate it into our data pipeline. The official Apache Kafka Documentation is a great introduction to Apache Kafka, so I strongly recommend that you read that in the first instance. For those of you that are already familiar with publish-subscribe messaging systems, some of the concepts may be familiar to you but nevertheless I would recommend reading the official documentation first.



  • Messages - Messages are byte arrays in which objects can be stored, such as String and JSON objects.
  • Topics - Topics are essentially categories of messages. You may have a Topic to store our tweets. You may have another Topic to store Alerts. Kafka stores streams of records in these Topics. As Kafka is a distributed system, Topics are partitioned across numerous nodes in the Kafka cluster. Topics may also be replicated across the nodes in the cluster. The Topics themselves in Kafka are made up of Partitions which is an ordered and immutable sequence of records that are appended to, and it is these Partitions that allow Topics to be spread across nodes in the Kafka cluster as a Topic can have many Partitions with each individual Partition fitting wholly onto its serving node.
  • Producers - Producers write/publish data to Topics. The Producer chooses which record to assign to which Partition within a Topic. By attaching a key to the Messages, a Producer can guarantee that all Messages within the same key will be published to the same Partition within a Topic. Kafka guarantees that Messages sent by a Producer to a particular Topic Partition will be appended in the order that they are sent.
  • Consumers - Consumers read (i.e. consume) from Topics. Topics can have zero, one or many Consumers that subscribe to it, known as Multi-Subscriber Topics (which is always the case in Kafka). When consuming from a Topic, a Consumer Group can be configured with multiple Consumers, and each record published to a Topic is delivered to one Consumer within each subscribing Consumer Group, with all Messages with the same key arriving at the same Consumer. If all Consumers belong to the same Consumer Group, records will effectively be load-balanced over the Consumer instances, as each Consumer instance will read messages from a unique subset of partitions in each Topic that they subscribe to. If all Consumers belong to different Consumer Groups, then each record will be broadcast to all Consumer instances. Kafka guarantees that a Consumer will process records in the order that they are published to the Topic.


What makes Kafka great is that it is a distributed, reliable and fault tolerant streaming platform. That is for Topics with a replication factor N, N-1 server failures can occur without losing any records. Furthermore, as each Topic is an ordered sequence of records that are appended, each message in a Partition is assigned a unique offset - Kafka does not record which messages have been read by which Consumers so as to retain only the unread messages but instead retains all messages for a configurable amount of time, whether or not they have been consumed. Therefore, Kafka can easily handle large numbers of Consumer instances, as it is up to the Consumers themselves to track their read positions, and can store large amounts of of data very well with low latency, high performance and added replication.

As well as working as a replacement for traditional message brokers, Kafka allows us to construct low-latency data pipelines as we can develop Consumers to subscribe to real-time event-based data in a reliable and fault-tolerant way, which is especially useful for critical real-time data where delivery of data must be guaranteed for integration and onward purposes. Whilst our example of processing tweets in real-time is not exactly critical, deploying Kafka into our real-time data pipeline now can teach us valuable lessons for when we are implementing mission-critical real-time production systems for our clients.


Kafka Channel

So how do we integrate Apache Kafka into our fledgling data pipeline? Well, we will be implementing the following design:

  • Producer - We will use the Flume Twitter Source to collect and then publish our tweets directly to a Kafka Channel removing the need for additional buffering
  • Topic - We will configure a single Topic to store our tweets in Kafka
  • Consumer - We will consume the messages in the Kafka Channel using the Spark Streaming API and its integration with Kafka


Before we proceed, let's take a moment to discuss the advantages and disadvantages of this approach. As I described earlier, Flume Channels are buffers that sit between Flume Sources and Flume Sinks, allowing the Sources to collect data without worrying about the Sinks, which are potentially operating at different rates anyhow. Furthermore, writing to and reading from Channels is achieved using Transactions - only after the transaction is committed will the batch of events within that Transaction be available to the Sinks. Flume supports a variety of Channels out-of-the-box:

  • Memory Channel - Events are stored in an in-memory queue. The advantage of using the Memory Channel is that it supports very high throughput as all the data is held in memory. However, for mission-critical events where delivery must be guaranteed, it should not be used as data is not persisted. Therefore in the event of the Agent failing, you may lose data.
  • File Channel - The File Channel writes all events to disk and is designed to be highly concurrent - it can handle multiple Sources and Sinks concurrently. If data loss cannot be tolerated and data durability and recoverability are important, the File Channel may be an option at the expense of performance.


Other out-of-the-box Channels include JDBC Channels (events are persisted to a Database) and Spillable Memory Channels (events are stored in-memory and disk). Flume also supports Kafka Channels where events are stored in a Kafka cluster, providing high availability, reliability and replication. In our case, the advantage of using a Kafka Channel directly from our Flume Twitter Source is that there is no need for additional buffering, it increases the reliability of our data pipeline and by using Source Interceptors with no explicit Sink, it allows us to write events into a Kafka Topic for use by other applications, like our eventual Spark Streaming Application. Depending on your requirements, you may design your Flume Agent and data pipeline differently. For example, if high performance is a requirement and the data is not mission-critical, you may decide to write events to a Topic using a Kafka Sink instead.

Now that we have covered some of the theory, let's get started implementing the rest of our data pipeline. As before, I will be using a single CentOS 7 Minimal Installation server to perform the commands below. In a production environment you would have a multi-node cluster, but for the sake of simple development purposes I will be using a single-node cluster. Note that ZooKeeper is out of scope for this article, however I will be coming back to it at a later date. For now, just think of ZooKeeper as a centralised service for configuration (e.g. bootstrapping cluster configuration from a central source) and distributed cluster management (e.g. node status in real-time) - that is Kafka uses ZooKeeper to help form its cluster of producer, consumer and broker nodes.


# Unpack the Kafka Binary to a directory of choice
tar -xzf kafka_2.12-0.10.2.1.tgz

# Basic Configuration of the internal ZooKeeper Service
vi config/zookeeper.properties

    # Absolute path to the ZooKeeper Data Directory and Client Port
    dataDir=<ZooKeeper Data Directory>
    clientPort=2181

# Basic Configuration of the Kafka Server
vi config/server.properties

    # Kafka Server Log Directory and ZooKeeper Co-ordinator Service
    log.dirs=<Kafka Log Directory>
    zookeeper.connect=<Hostname>:2181

# Start the internal ZooKeeper Service
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

# Start the Kafka Server
bin/kafka-server-start.sh config/server.properties

# Create the Twitter Topic for our Tweets
bin/kafka-topics.sh --create --zookeeper <ZooKeeper Hostname>:2181 --replication-factor 1 --partitions 2 --topic twitter
Created topic "twitter".


Now that Kafka is up and running and we have created our Twitter Topic, we need to update our Flume Agent to write to the Kafka Channel. For now, we will keep the Flume Logger Sink outputting to the Console so that we can confirm that our newly designed Flume Agent is still working. In the next section of this article, we will discuss Apache Spark and how to configure it to read from the Kafka Channel.


# Update the Flume Agent to act as the Kafka Producer and Consumer
cd $FLUME_HOME
vi conf/flume-twitter.conf

    # The configuration file needs to define the sources,
    # the channels and the sinks.
    # Sources, channels and sinks are defined per agent,
    # in this case called 'TwitterAgent'

    # Flume Instance - Twitter Agent
    TwitterAgent.sources = Twitter
    TwitterAgent.channels = Kafka
    TwitterAgent.sinks = Logger

    # Source Configuration - Inbuilt TwitterSource
    TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
    TwitterAgent.sources.Twitter.channels = Kafka
    TwitterAgent.sources.Twitter.consumerKey = <Your Twitter App Consumer Key>
    TwitterAgent.sources.Twitter.consumerSecret = <Your Twitter App Consumer Secret>
    TwitterAgent.sources.Twitter.accessToken = <Your Twitter App Access Token>
    TwitterAgent.sources.Twitter.accessTokenSecret = <Your Twitter App Access Token Secret>
    TwitterAgent.sources.Twitter.keywords = <Comma Delimited List of Keywords to filter the Tweets>
    TwitterAgent.sources.Twitter.interceptors = interceptor1
    TwitterAgent.sources.Twitter.interceptors.interceptor1.type = timestamp

    # Channel Configuration - Kafka Channel
    TwitterAgent.channels.Kafka.type = org.apache.flume.channel.kafka.KafkaChannel
    TwitterAgent.channels.Kafka.capacity = 10000
    TwitterAgent.channels.Kafka.transactionCapacity = 100
    TwitterAgent.channels.Kafka.brokerList = <Kafka Broker Hostname>:9092
    TwitterAgent.channels.Kafka.topic = twitter
    TwitterAgent.channels.Kafka.zookeeperConnect = <ZooKeeper Hostname>:2181
    TwitterAgent.channels.Kafka.parseAsFlumeEvent = true

    # Sink Configuration - Logger Sink
    TwitterAgent.sinks.Logger.type = logger
    TwitterAgent.sinks.Logger.channel = Kafka

# Launch the Flume Agent using the Kafka Channel and sinking to the Console
bin/flume-ng agent --name TwitterAgent --conf conf --conf-file conf/flume-twitter.conf -Dflume.root.logger=DEBUG,console


When you run the final command above, a lot of output will be produced in your console, including the tweets as before. If you would like to explicitly check that Kafka is receiving the messages from the Flume Producer, we can run a Kafka Consumer via the command line to subscribe to the Twitter Topic and display all messages from the beginning with a header.


# Run a Kafka Consumer subscribed to the Twitter Topic
cd $KAFKA_HOME
bin/kafka-console-consumer.sh --zookeeper <ZooKeeper Hostname>:2181 --topic twitter --from-beginning



Apache Spark

We have now setup a Flume Agent to collect tweets using our Twitter Application, publish those tweets to a Kafka Topic and finally consume those tweets from the Kafka Channel and sink them to a Logger or the HDFS. The last section of this article will describe how we can connect Apache Spark to Kafka to consume the tweets from Kafka using the Spark Streaming API. (My next article will describe how we can then use Apache Spark to perform some analytics and build predictive models using our real-time stream of tweets!)

Apache Spark is a big-data processing engine and cluster-computing framework capable of performing large-scale data processing and analytics, including Machine Learning, in both batch and real-time. Spark Streaming extends the core Spark API by letting us reuse the same code for batch processing on real-time streaming data flows, hence allowing us to perform real-time data processing and analytics. As always, the official Apache Spark Documentation is the best place to start to get up to speed with Apache Spark.



  • Resilient Distributed Dataset - Resilient Distributed Dataset, or RDDs, are the basic in-memory data structures in Spark - they are an immutable distributed collection of objects divided into logical partitions across nodes in a Spark cluster. Spark hides the underlying partitioning and can thus expose a higher-level API in which developers can perform typical operations on RDDs such a maps and filtering using either Java, Scala or Python without worrying (too much!) about the underlying framework. RDDs are stored in memory and are immutable - transformations, such as maps, on RDDs create new datasets from an existing one, and actions return a value. All transformations are lazy meaning that the results are not computed immediately, and are only computed when an action is required. RDDs are also cacheable, meaning that data can be persisted to other mediums such as to disk, and records are partitioned and distributed across the Spark cluster. RDDs were designed, in part, to efficiently run iterative algoritms, such as Machine Learning and Graph algorithms, and data mining.
  • Spark Context - The Spark Context is the core of a Spark application, allowing it to access the Spark cluster through a Resource Manager such as Mesos, YARN or Apache Spark's own cluster manager.
  • Driver Program - The Spark Driver is a program that declares the series of transformations and actions to undertake on RDDs of data and hosts the Spark Context for a Spark Application. The Driver Program splits a Spark application into Tasks which are then scheduled and co-ordinated by the Driver Program.
  • Workers - Workers/Slaves are Spark instances which execute the Tasks scheduled by the Driver Program in a thread pool.
  • Executors - Executors are the agents that actually execute Tasks and reside in Workers, reporting heartbeats and metrics for active Tasks.


For the remainder of this article, we will be concentrating on the Spark Streaming API and its core components.



  • Discretized Streams - As well as batch processing, Spark supports processing of real-time data streams, such as our Twitter stream, via its high-throughput, scalable and fault-tolerant Streaming API, an extension of the core Spark API. Discretized Streams, or DStream, represents a continuous stream of data which can be created from input data streams such as Flume and Kafka that were introduced above. The input data streams are divided into batches (internally a DStream is a continuous series of RDDs) which are then processed by the Spark engine (allowing us to reuse code developed for batch processing for near real-time processing) to generate the final stream of results also in batches. The final stream of results can then be published into another Kafka Topic (allowing us to create further complex chained data flows) or persisted to HDFS or databases for future analysis.


To finish this article, we are going to integrate Apache Spark with Kafka using Spark's Streaming API to consume the tweets from Kafka. Obviously this does not illustrate the full analytical power of Spark, but in my next article we will build upon our real-time data pipeline by performing analytics on the stream of tweets in Spark!

To start, we are going to configure and deploy a very basic single-node Spark cluster using Spark's own cluster manager and our CentOS 7 Minimal Installation Server as before.


# Unpack the Spark Binary to a directory of choice
tar -xzf spark-2.1.0-bin-hadoop2.7.tgz

# Basic Single-Node Spark Deployment Environment Configuration
# Note that Spark properties can be set directly using SparkConf passed to your SparkContext
# You can also specify relevant default configuration values using conf/spark-defaults.conf as follows
> vi spark-defaults.conf

    # Spark Deployment Environment Properties
    # In this example, we will be using Spark's own cluster manager
    spark.master                            spark://<Cluster Manager Hostname>:7077
    spark.driver.cores                      1
    spark.driver.maxResultSize              1g
    spark.driver.memory                     1g
    spark.executor.memory                   2g
    spark.local.dir                         <Spark Scratch Directory>

    # Networking Properties
    spark.driver.host                       <Driver Hostname>
    spark.network.timeout                   120s

    # Hadoop Properties
    spark.hadoop.dfs.replication            1

# Start the Spark Master
sbin/start-master.sh

# Start a Spark Slave
sbin/start-slave.sh spark://<Cluster Manager Hostname>:7077


You can access the Spark Master UI at <Cluster Manager Hostname>:8080 (default) to confirm that your Single-Node Spark cluster is up and running with 1 worker. Now that we have our very basic Single-Node Spark Cluster deployed, let us update our Flume Agent with the Twitter Source and Interceptor but no sink using the Kafka Channel to write events into a Kafka Topic for Apache Spark to consume.


# Update the Flume Agent so that Spark can pull data from a Custom Sink using a Flume Receiver
cd $FLUME_HOME
vi conf/flume-twitter.conf

    # The configuration file needs to define the sources,
    # the channels and the sinks.
    # Sources, channels and sinks are defined per agent,
    # in this case called 'TwitterAgent'

    # Flume Instance - Twitter Agent
    TwitterAgent.sources = Twitter
    TwitterAgent.channels = Kafka

    # Source Configuration - Inbuilt TwitterSource
    TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
    TwitterAgent.sources.Twitter.channels = Kafka
    TwitterAgent.sources.Twitter.consumerKey = <Your Twitter App Consumer Key>
    TwitterAgent.sources.Twitter.consumerSecret = <Your Twitter App Consumer Secret>
    TwitterAgent.sources.Twitter.accessToken = <Your Twitter App Access Token>
    TwitterAgent.sources.Twitter.accessTokenSecret = <Your Twitter App Access Token Secret>
    TwitterAgent.sources.Twitter.keywords = <Comma Delimited List of Keywords to filter the Tweets>

    # Channel Configuration - Kafka Channel
    TwitterAgent.channels.Kafka.type = org.apache.flume.channel.kafka.KafkaChannel
    TwitterAgent.channels.Kafka.capacity = 10000
    TwitterAgent.channels.Kafka.transactionCapacity = 100
    TwitterAgent.channels.Kafka.brokerList = <Kafka Broker Hostname>:9092
    TwitterAgent.channels.Kafka.topic = twitter
    TwitterAgent.channels.Kafka.zookeeperConnect = <ZooKeeper Hostname>:2181
    TwitterAgent.channels.Kafka.parseAsFlumeEvent = true


We are now ready to write our Spark Streaming Application to read tweets from Kafka and perform some simple processing on them in Spark. As I described above, Spark exposes a higher-level API so that developers can write their Spark applications in either Java, Scala or Python. As a Java Developer I will naturally choose Java or Scala where I can!

Spark Streaming Application

Spark Streaming supports two approaches to reading data from Kafka:

  • Receiver-based Approach - Receiver Tasks inside Executors on Workers are used to receive the data which is processed by jobs launched by the Spark Streaming Context. However this method is unreliable and can lead to data loss in the case of node failures.
  • Direct Approach - In this approach, Spark periodically queries Kafka for the latest offsets in each Topic and Partition which are used to define the ranges of offsets when reading from Kafka. This approach offers better reliability and efficiency.


We will be using the Direct Approach to consume messages from Kafka and process them in Spark. Since I will be writing my Spark Streaming Application in Java, I will first create an Apache Maven project to handle all the build dependencies. I will configure Maven to build a fat JAR, meaning that the final JAR that we will submit to Apache Spark will include not only our Spark Streaming Application but all the required dependencies that it needs to run. A couple of things to note in my POM file below are:

  • Spark Streaming Kafka - The dependency spark-streaming-kafka-0-8_2.11 is required in order to connect our Spark Streaming Application with Kafka
  • Provided Dependencies - spark-core_2.11 and spark-streaming_2.11 are already provided in the Spark installation, so we mark these as provided in our POM
  • Maven Assembly Plugin - We will be using the Maven Assembly Plugin to build our fat JAR including all the required dependencies



<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <!-- Maven Coordinates -->
    <groupId>io.keisan.knowledgebase.spark.streaming</groupId>
    <artifactId>keisan-spark-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <!-- Project Information -->
    <name>Spark Kafka Direct Integration</name>
    <description>Keisan Knowledgebase Spark Streaming Project - Kafka Direct Integration</description>
    <url>https://www.keisan.io/knowledgebase</url>
    <organization>
        <name>Keisan Ltd</name>
        <url>https://www.keisan.io</url>
    </organization>
    <developers>
        <developer>
            <id>jillur.quddus</id>
            <name>Jillur Quddus</name>
            <email>contactus@keisan.io</email>
            <url>https://www.keisan.io</url>
            <organization>Keisan Ltd</organization>
            <organizationUrl>https://www.keisan.io</organizationUrl>
            <roles>
                <role>Lead Engineer</role>
                <role>Data Scientist</role>
            </roles>
            <timezone>Europe/London</timezone>
        </developer>
    </developers>

    <!-- Repositories -->
    <repositories>

        <!-- Confluent Repository for KafkaAvroDecoder -->
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>

    </repositories>

    <!-- Properties -->
    <properties>
        <apache.avro.version>1.8.1</apache.avro.version>
        <apache.spark.core.2.11.version>2.1.0</apache.spark.core.2.11.version>
        <apache.spark.streaming.2.11.version>2.1.0</apache.spark.streaming.2.11.version>
        <apache.spark.streaming.kafka-0-8_2.11.version>2.1.0</apache.spark.streaming.kafka-0-8_2.11.version>
        <confluent.kafka.avro.serializer.version>3.2.1</confluent.kafka.avro.serializer.version>
        <jdk.version>1.8</jdk.version>
        <maven.plugins.maven-assembly-plugin.version>3.0.0</maven.plugins.maven-assembly-plugin.version>
        <maven.plugins.maven-compiler-plugin.version>3.6.1</maven.plugins.maven-compiler-plugin.version>
        <output.directory>/keisan/knowledgebase/spark/jars</output.directory>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <!-- Dependencies -->
    <dependencies>

        <!-- Apache Avro -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${apache.avro.version}</version>
        </dependency>

        <!-- Apache Spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${apache.spark.core.2.11.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${apache.spark.streaming.2.11.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>${apache.spark.streaming.kafka-0-8_2.11.version}</version>
        </dependency>

        <!-- Confluent Kafka Avro Serializer -->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.kafka.avro.serializer.version}</version>
        </dependency>

    </dependencies>

    <!-- Build -->
    <build>

        <!-- Plugins -->
        <plugins>

            <!-- Maven Compiler: Compile the Sources of the Project -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven.plugins.maven-compiler-plugin.version}</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>

            <!-- Maven Assembly: Aggregate project output with its dependencies -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>${maven.plugins.maven-assembly-plugin.version}</version>
                <configuration>

                    <!-- Final JAR Filename -->
                    <finalName>keisan-spark-kafka-${project.version}</finalName>

                    <!-- Include all Project Dependencies -->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>

                    <!-- JAR with dependencies Output Target Directory -->
                    <outputDirectory>${output.directory}</outputDirectory>

                </configuration>
                <executions>

                    <!-- Bind the assembly to the package phase -->
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>

                </executions>

            </plugin>

        </plugins>

    </build>

</project>


We are now ready to start developing our Spark Streaming Application. To create an input DStream, we need to import KafkaUtils and create a Direct Stream using a Spark Streaming Context, the hostname and the port that the Kafka Broker is listening on and the Kafka Topic from which we want to consume messages from.

Simple Plain-Text Processing

In the example below, we simply consume the tweets from Kafka, decode the values as Strings and output them to the Spark Executor Standard Out.


package io.keisan.knowledgebase.spark.streaming.kafka;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.serializer.StringDecoder;

/**
 * Example Spark Streaming Application with Kafka Direct Integration
 *
 * Periodically query Kafka for the latest offsets in each Topic and Partition.
 * Consume the tweets using a String Decoder and display them in the console.
 *
 * Usage: StreamingKafkaDirectStringDecoder 
 *  broker: The hostname and port at which the Kafka Broker is listening
 *
 * @author jillur.quddus
 * @version 0.0.1
 *
 */

public class StreamingKafkaDirectStringDecoder {

    public static void main(String[] args) throws InterruptedException {

        if ( args.length != 1 ) {
            System.err.println("Usage: StreamingKafkaDirectStringDecoder <broker>");
            System.exit(1);
        }

        // Create a Java Streaming Context with a Batch Interval of 5 seconds
        SparkConf conf = new SparkConf().setAppName("Kafka Direct String Decoder");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

        // Specify the Kafka Broker Options and set of Topics
        String broker = args[0];
        Map<String, String> kafkaParameters = new HashMap<String, String>();
        kafkaParameters.put("metadata.broker.list", broker);
        Set<String> topics = Collections.singleton("twitter");

        // Create an input DStream using KafkaUtils and simple plain-text message processing
        JavaPairInputDStream<String, String> kafkaDirectStream = KafkaUtils.createDirectStream(jssc,
                String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParameters, topics);
        kafkaDirectStream.foreachRDD(rdd -> {
            rdd.foreach(record -> System.out.println(record._2));
        });

        // Start the computation
        jssc.start();

        // Wait for the computation to terminate
        jssc.awaitTermination();

    }

}


Avro Processing

Note that the Flume Twitter Source that we are using in our data pipeline actually converts the tweets into Avro format and sends these Avro messages downstream. Avro is a data serialisation schema-based and language-neutral system that uses JSON to declare data structures and schema. Instead of using the StringDecoder that simply decodes the values as Strings in our Streaming Application above, you could either use the DefaultDecoder that returns the raw array of Bytes and then decode it using Avro's Binary Decoder, or Confluent's KafkaAvroDecoder to receive messages with Avro records as their values.

Running the Spark Streaming Application

We are now ready to run our Spark Streaming Application! Use Maven to build the fat JAR and submit it to your Spark cluster. Since for the purposes of this article we are only using a single-node Spark cluster for development and debugging purposes, it makes sense to deploy our application in client mode.


# Submit our Spark Streaming Application
cd $SPARK_HOME
bin/spark-submit --class io.keisan.knowledgebase.spark.streaming.kafka.StreamingKafkaDirectStringDecoder --deploy-mode client /keisan/knowledgebase/spark/jars/keisan-spark-kafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar <Kafka Broker Hostname:Port>

# Launch the Flume Agent using the Twitter Source and Kafka Channel
cd $FLUME_HOME
bin/flume-ng agent --name TwitterAgent --conf conf --conf-file conf/flume-twitter.conf


If you now examine the Spark Executor output, you should be able to see the tweets being streamed from Kafka to Spark in near real-time!

We have now successfully developed a near real-time, high-throughput, reliable and fault-tolerant data pipeline. Apache Flume is used to collect the event-based data (tweets in our example) which are published to Apache Kafka. Apache Spark is then used to consume the data from Apache Kafka and perform near real-time data processing. Obviously, simply ingesting and printing data in no way demonstrates the capabilities of Apache Spark! So in my next article, I will be discussing how to build predictive models and peform data analytics in real-time on our stream of event-based data in Spark.

TOP