Keisan

Apache Kafka Producer with Avro Bijection

Platform Engineering

Apache Kafka Producer with Avro Bijection

Building an Apache Kafka Producer for Tweets using Hosebird, Avro and Bijection

Apache Kafka Producer with Avro Bijection


In my last article entitled 'Real-Time Data Pipeline with Apache Kafka and Spark', I used Apache Flume to fetch tweets from the Twitter Stream using the demo Flume Twitter Source that is bundled with Flume out-of-the-box. The demo Twitter Source connects to the Twitter Stream and continuously downloads a sample of tweets. The tweets were then published to a Topic in the Kafka Channel that we setup. In this article, we will be writing a custom Kafka Producer in Java using Twitter's Hosebird Client to collect tweets from Twitter's Streaming API, parse and convert them to Avro format before publishing them to a Topic in Apache Kafka. The high-level real-time data pipeline will therefore be as follows:



Apache Maven

First let us setup our Apache Maven project to handle the build dependencies for our Kafka Producer, including Twitter's Hosebird Client.


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <!-- Maven Coordinates -->
    <groupId>io.keisan.knowledgebase.kafka.producers</groupId>
    <artifactId>keisan-kafka-producers</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <!-- Project Information -->
    <name>Example Kafka Producers</name>
    <description>Keisan Knowledgebase Kafka Producers Project - Example Kafka Producers</description>
    <url>https://www.keisan.io/knowledgebase/real-time-predictive-modelling</url>
    <organization>
        <name>Keisan Ltd</name>
        <url>https://www.keisan.io</url>
    </organization>
    <developers>
        <developer>
            <id>jillur.quddus</id>
            <name>Jillur Quddus</name>
            <email>contactus@keisan.io</email>
            <url>https://www.keisan.io</url>
            <organization>Keisan Ltd</organization>
            <organizationUrl>https://www.keisan.io</organizationUrl>
            <roles>
                <role>Lead Engineer</role>
                <role>Data Scientist</role>
            </roles>
            <timezone>Europe/London</timezone>
        </developer>
    </developers>

    <!-- Properties -->
    <properties>
        <apache.avro.version>1.8.1</apache.avro.version>
        <apache.kafka.2.11.version>0.8.2.2</apache.kafka.2.11.version>
        <bijection.avro.2.11.version>0.9.5</bijection.avro.2.11.version>
        <jdk.version>1.8</jdk.version>
        <maven.plugins.maven-assembly-plugin.version>3.0.0</maven.plugins.maven-assembly-plugin.version>
        <maven.plugins.maven-compiler-plugin.version>3.6.1</maven.plugins.maven-compiler-plugin.version>
        <output.directory>/keisan/knowledgebase/kafka/jars</output.directory>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <twitter.hbc.core.version>2.2.0</twitter.hbc.core.version>
        <twitter.hbc.twitter4j.version>2.2.0</twitter.hbc.twitter4j.version>
    </properties>

    <!-- Dependencies -->
    <dependencies>

        <!-- Apache Avro -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${apache.avro.version}</version>
        </dependency>

        <!-- Apache Kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${apache.kafka.2.11.version}</version>
        </dependency>

        <!-- Bijection -->
        <dependency>
            <groupId>com.twitter</groupId>
            <artifactId>bijection-avro_2.11</artifactId>
            <version>${bijection.avro.2.11.version}</version>
        </dependency>

        <!-- Twitter Hosebird Client -->
        <dependency>
            <groupId>com.twitter</groupId>
            <artifactId>hbc-core</artifactId>
            <version>${twitter.hbc.core.version}</version>
        </dependency>
        <dependency>
            <groupId>com.twitter</groupId>
            <artifactId>hbc-twitter4j</artifactId>
            <version>${twitter.hbc.twitter4j.version}</version>
        </dependency>

    </dependencies>

    <!-- Build -->
    <build>

        <!-- Plugins -->
        <plugins>

            <!-- Maven Compiler: Compile the Sources of the Project -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven.plugins.maven-compiler-plugin.version}</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>

            <!-- Maven Assembly: Aggregate project output with its dependencies -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>${maven.plugins.maven-assembly-plugin.version}</version>
                <configuration>

                    <!-- Final JAR Filename -->
                    <finalName>keisan-kafka-producers-${project.version}</finalName>

                    <!-- Include all Project Dependencies -->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>

                    <!-- Main Class -->
                    <archive>
                        <manifest>
                            <mainClass>io.keisan.knowledgebase.kafka.producers.TwitterProducer</mainClass>
                        </manifest>
                    </archive>

                    <!-- JAR with dependencies Output Target Directory -->
                    <outputDirectory>${output.directory}</outputDirectory>

                </configuration>
                <executions>

                    <!-- Bind the assembly to the package phase -->
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>

                </executions>

            </plugin>

        </plugins>

    </build>

</project>


Avro Schema

We are now ready to create our Twitter Producer Java Class. I will start off by defining the Avro schema which will be used to serialize and publish the messages to our Kafka Topic. For the full list of available fields, please refer to https://dev.twitter.com/overview/api/tweets.


    // Avro Schema to use to serialise messages to the Kafka Topic
    // For the full list of Tweet fields, please refer to
    // https://dev.twitter.com/overview/api/tweets
    private static Schema schema;
    private static Injection<GenericRecord, byte[]> recordInjection;
    public static final String TWEET_SCHEMA = "{"
            + "\"type\":\"record\","
            + "\"name\":\"tweet\","
            + "\"fields\":["
            + "  { \"name\":\"id\", \"type\":\"string\" },"
            + "  { \"name\":\"user_name\", \"type\":\"string\" },"
            + "  { \"name\":\"text\", \"type\":\"string\" }"
            + "]}";


The next method that I will define will generate an Avro Record from a data interface representing a single Tweet using our newly defined Avro Schema.


    /**
     * Parse and convert the Tweet Status into an Avro Record for serialising
     * and publishing to the Kafka Topic.
     * @param avroSchema
     * @param status
     * @return
     */

    private static GenericData.Record createRecord(Status status) {

        User user = status.getUser();
        GenericData.Record doc = new GenericData.Record(schema);
        doc.put("id", String.valueOf(status.getId()));
        doc.put("user_name", user.getName());
        doc.put("text", status.getText());
        return doc;

    }


Twitter Hosebird Client and Twitter4j Wrapper

We can now move onto establishing a connection to the Twitter Streaming API and creating our Kafka Producer. To create a basic Kafka Producer is very simple:

  • Bootsrap Servers - Defines an initial list of Kafka servers for Bootstrapping
  • Key Serializer - The class with which to serialise the Key objects.
  • Value Serializer - The class with which to serialise the Value objects. In our case, we will use the Byte Array Serializer to transmit the Avro messages.
  • Request Acknowledgement - Defines when a Producer request is considered complete by defining the number of other Brokers that must have committed the data to their log and acknowledged this to the Leader. 0 means that the Producer never waits for an acknowledgement. 1 means that the Producer gets an acknowledgement after the Leader replica has received the data. -1 means that the Producer gets an acknowledgement after all in-sync replicas have received the data.

Note that we will be using Twitter's Bijection Library to serialise and deserialise our Avro messages as it has a more user-friendly API than Avro's native API.

As I mentioned earlier, we will be using Twitter's Hosebird Client to collect tweets from Twitter's Streaming API. To create a Hosebird Client, we define a First In First Out (FIFO) Queue to collect messages from the stream using a Filtered Endpoint allowing us to filter the tweets based on a defined list of terms. Finally we need the Twitter Application Authentication attributes that we generated in the last article to provision us access to the Twitter Stream.

We will then use the Twitter4j Library to create a Twitter4j Client that will wrap around the Hosebird Client exposing a custom Status Listener. It is within this custom Status Listener that we will parse the Status and generate the Avro Record that we will publish to the Kafka Topic via our Producer. Finally, we will create an Executor Service to spawn Threads to parse the actual messages that we collect - Runnables are submitted to the Executor Service to process messages in the FIFO messaging queue that we defined earlier when we call the process method on our Twitter4j Client Wrapper. This method looks as follows:


    /**
     * Wrap a Twitter4j Client around a Hosebird Client using a custom Status Listener
     * and an Executor Service to spawn threads to parse the messages received
     * @param kafkaBroker
     * @throws InterruptedException
     */

    public static void run(String kafkaBroker) throws InterruptedException {

        // Kafka Producer Properties
        Properties producerProperties = new Properties();

        // Bootstrapping
        producerProperties.put("bootstrap.servers", kafkaBroker);

        // Serializer Class for Keys
        producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // Serializer Class for Values
        producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        // When a Produce Request is considered completed
        producerProperties.put("request.required.acks", "1");

        // Create the Kafka Producer
        producer = new KafkaProducer<>(producerProperties);

        // Twitter Connection and Filtering Properties
        BlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>(100000);
        StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
        endpoint.stallWarnings(false);
        endpoint.trackTerms(Lists.newArrayList("brexit", "#vote2017"));
        Authentication authentication = new OAuth1(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET);

        // Build a Twitter Hosebird Client
        ClientBuilder hosebirdClientBuilder = new ClientBuilder()
                .name("Keisan Knowledgebase Twitter Hosebird Client")
                .hosts(Constants.STREAM_HOST)
                .authentication(authentication)
                .endpoint(endpoint)
                .processor(new StringDelimitedProcessor(messageQueue));
        BasicClient hosebirdClient = hosebirdClientBuilder.build();

        // Create an Executor Service to spawn threads to parse the messages
        // Runnables are submitted to the Executor Service to process the Message Queue
        int numberProcessingThreads = 2;
        ExecutorService service = Executors.newFixedThreadPool(numberProcessingThreads);

        // Wrap a Twitter4j Client around the Hosebird Client using a custom Status Listener
        Twitter4jStatusClient twitter4jClient = new Twitter4jStatusClient(
                hosebirdClient, messageQueue, Lists.newArrayList(statusListener), service);

        // Connect to the Twitter Streaming API
        twitter4jClient.connect();

        // Twitter4jStatusClient.process must be called for every Message Processing Thread to be spawned
        for (int threads = 0; threads < numberProcessingThreads; threads++) {
            twitter4jClient.process();
        }

        // Run the Producer for 60 seconds for DEV purposes
        // Note that this is NOT a graceful exit
        Thread.sleep(60000);
        producer.close();
        hosebirdClient.stop();

    }


Twitter4j Custom Status Listener

Finally we need to define our custom Status Listener which will be called for every new Tweet. Our custom Status Listener will parse and generate an Avro Record from the Status using the createRecord method that we defined above. It will then use the Bijection Library to create a Byte Array from this Avro Record that our Kafka Producer will then send to Kafka for publishing to a Kafka Topic.


    /**
     * Custom Status Listener
     * The onStatus method gets called for every new Tweet. It is here where we
     * will parse the incoming messages and generate the Avro Record which will be
     * serialised and sent using our Kafka Producer.
     */

    private static StatusListener statusListener = new StatusStreamHandler() {

        @Override
        public void onStatus(Status status) {

            // Convert the Status object into an Avro Record for serialising and publishing to the Kafka Topic
            GenericData.Record avroRecord = createRecord(status);
            byte[] avroRecordBytes = recordInjection.apply(avroRecord);
            ProducerRecord<String, byte[]> record = new ProducerRecord<>(KAFKA_TOPIC, avroRecordBytes);

            // Send the Message to Kafka
            producer.send(record);

        }

        @Override
        public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}

        @Override
        public void onTrackLimitationNotice(int limit) {}

        @Override
        public void onScrubGeo(long user, long upToStatus) {}

        @Override
        public void onStallWarning(StallWarning warning) {}

        @Override
        public void onException(Exception e) {}

        @Override
        public void onDisconnectMessage(DisconnectMessage message) {}

        @Override
        public void onStallWarningMessage(StallWarningMessage warning) {}

        @Override
        public void onUnknownMessageType(String s) {}

    };


Kafka Twitter Producer

Putting it all together, our final Kafka Twitter Producer looks as follows:


package io.keisan.knowledgebase.kafka.producers;

import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import com.twitter.hbc.twitter4j.Twitter4jStatusClient;
import com.twitter.hbc.twitter4j.handler.StatusStreamHandler;
import com.twitter.hbc.twitter4j.message.DisconnectMessage;
import com.twitter.hbc.twitter4j.message.StallWarningMessage;

import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.User;

/**
 * Example Kafka Twitter Producer
 *
 * Wrap a Twitter4j Client around a Hosebird Client using a custom Status Listener
 * to connect to the Twitter Streaming API. Parse and convert the messages (tweets)
 * to Avro format and publish them to a Kafka Topic.
 *
 * Usage: TwitterProducer
 *  broker: The hostname and port at which the Kafka Broker is listening
 *
 * @author jillur.quddus
 * @version 0.0.1
 *
 */

public class TwitterProducer {

    // Kafka Producer - note that Kafka Producers are Thread Safe and that sharing a Producer instance
    // across threads is generally faster than having multiple Producer instances
    private static KafkaProducer<String, byte[]> producer;

    // Note that for a Production Deployment, do not hard-code your Twitter Application Authentication Keys
    // Instead, derive from a Configuration File or Context
    private static final String CONSUMER_KEY = "<Your Twitter Application Consumer Key>";
    private static final String CONSUMER_SECRET = "<Your Twitter Application Consumer Secret>";
    private static final String ACCESS_TOKEN = "<Your Twitter Application Access Token>";
    private static final String ACCESS_TOKEN_SECRET = "<Your Twitter Application Access Token Secret>";
    private static final String KAFKA_TOPIC = "twitter";

    // Avro Schema to use to serialise messages to the Kafka Topic
    // For the full list of Tweet fields, please refer to
    // https://dev.twitter.com/overview/api/tweets
    private static Schema schema;
    private static Injection<GenericRecord, byte[]> recordInjection;
    public static final String TWEET_SCHEMA = "{"
            + "\"type\":\"record\","
            + "\"name\":\"tweet\","
            + "\"fields\":["
            + "  { \"name\":\"id\", \"type\":\"string\" },"
            + "  { \"name\":\"user_name\", \"type\":\"string\" },"
            + "  { \"name\":\"text\", \"type\":\"string\" }"
            + "]}";

    /**
     * Wrap a Twitter4j Client around a Hosebird Client using a custom Status Listener
     * and an Executor Service to spawn threads to parse the messages received
     * @param kafkaBroker
     * @throws InterruptedException
     */

    public static void run(String kafkaBroker) throws InterruptedException {

        // Kafka Producer Properties
        Properties producerProperties = new Properties();

        // Bootstrapping
        producerProperties.put("bootstrap.servers", kafkaBroker);

        // Serializer Class for Keys
        producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // Serializer Class for Values
        producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        // When a Produce Request is considered completed
        producerProperties.put("request.required.acks", "1");

        // Create the Kafka Producer
        producer = new KafkaProducer<>(producerProperties);

        // Twitter Connection and Filtering Properties
        BlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>(100000);
        StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
        endpoint.stallWarnings(false);
        endpoint.trackTerms(Lists.newArrayList("brexit", "#vote2017"));
        Authentication authentication = new OAuth1(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET);

        // Build a Twitter Hosebird Client
        ClientBuilder hosebirdClientBuilder = new ClientBuilder()
                .name("Keisan Knowledgebase Twitter Hosebird Client")
                .hosts(Constants.STREAM_HOST)
                .authentication(authentication)
                .endpoint(endpoint)
                .processor(new StringDelimitedProcessor(messageQueue));
        BasicClient hosebirdClient = hosebirdClientBuilder.build();

        // Create an Executor Service to spawn threads to parse the messages
        // Runnables are submitted to the Executor Service to process the Message Queue
        int numberProcessingThreads = 2;
        ExecutorService service = Executors.newFixedThreadPool(numberProcessingThreads);

        // Wrap a Twitter4j Client around the Hosebird Client using a custom Status Listener
        Twitter4jStatusClient twitter4jClient = new Twitter4jStatusClient(
                hosebirdClient, messageQueue, Lists.newArrayList(statusListener), service);

        // Connect to the Twitter Streaming API
        twitter4jClient.connect();

        // Twitter4jStatusClient.process must be called for every Message Processing Thread to be spawned
        for (int threads = 0; threads < numberProcessingThreads; threads++) {
            twitter4jClient.process();
        }

        // Run the Producer for 60 seconds for DEV purposes
        // Note that this is NOT a graceful exit
        Thread.sleep(60000);
        producer.close();
        hosebirdClient.stop();

    }

    /**
     * Custom Status Listener
     * The onStatus method gets called for every new Tweet. It is here where we
     * will parse the incoming messages and generate the Avro Record which will be
     * serialised and sent using our Kafka Producer.
     */

    private static StatusListener statusListener = new StatusStreamHandler() {

        @Override
        public void onStatus(Status status) {

            // Convert the Status object into an Avro Record for serialising and publishing to the Kafka Topic
            GenericData.Record avroRecord = createRecord(status);
            byte[] avroRecordBytes = recordInjection.apply(avroRecord);
            ProducerRecord<String, byte[]> record = new ProducerRecord<>(KAFKA_TOPIC, avroRecordBytes);

            // Send the Message to Kafka
            producer.send(record);

        }

        @Override
        public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}

        @Override
        public void onTrackLimitationNotice(int limit) {}

        @Override
        public void onScrubGeo(long user, long upToStatus) {}

        @Override
        public void onStallWarning(StallWarning warning) {}

        @Override
        public void onException(Exception e) {}

        @Override
        public void onDisconnectMessage(DisconnectMessage message) {}

        @Override
        public void onStallWarningMessage(StallWarningMessage warning) {}

        @Override
        public void onUnknownMessageType(String s) {}

    };

    /**
     * Parse and convert the Tweet Status into an Avro Record for serialising
     * and publishing to the Kafka Topic.
     * @param avroSchema
     * @param status
     * @return
     */

    private static GenericData.Record createRecord(Status status) {

        User user = status.getUser();
        GenericData.Record doc = new GenericData.Record(schema);
        doc.put("id", String.valueOf(status.getId()));
        doc.put("user_name", user.getName());
        doc.put("text", status.getText());
        return doc;

    }

    public static void main(String[] args) {

        if ( args.length != 1 ) {
            System.err.println("Usage: TwitterProducer <broker>");
            System.exit(1);
        }

        try {

            // Create the Avro Schema
            Schema.Parser parser = new Schema.Parser();
            schema = parser.parse(TWEET_SCHEMA);
            recordInjection = GenericAvroCodecs.toBinary(schema);

            // Connect to the Twitter Streaming API and start the Producer
            TwitterProducer.run(args[0]);

        } catch (Exception e) {

            System.out.println(e);

        }

    }

}


As per my last article, you can check that Kafka is receiving messages from our Kafka Producer by running a Kafka Consumer via the command line to subscribe to the Twitter Topic and consume messages as they arrive:


# Run a Kafka Consumer subscribed to the Twitter Topic
cd $KAFKA_HOME
bin/kafka-console-consumer.sh --zookeeper <ZooKeeper Hostname>:2181 --topic twitter


In my next article, we will be using this updated real-time event-based streaming data pipeline to build predictive models using Apache Spark's Streaming API and Machine Learning Library.

TOP