In previous posts, I introduced about how to get started with Apache Kafka by installing and using Java client API 0.9 as well. In this post, I’d like to share how to create multi-threaded Apache Kafka consumer.

You can take a look at previous related posts by access below links.

Getting started with Apache Kafka 0.9

Apache Kafka 0.9 Java Client API Example

1. Why do we need multi-thread consumer model?

Suppose we implement a notification module which allow users to subscribe for notifications from other users, other applications..Our module reads messages which will be written by other users, applications to a Kafka clusters. In this case, we can get all notifications of the others written to a Kafka topic and our module will create a consumer to subscribe to that topic.

Everything seems to be fine at the beginning. However, what will happen if the number of notifications produced by other applications, users…is increased fast and exceed the rate that can be processed by our module?

Well, everything is still…not bad. All the messages/notifications that haven’t been processed by our module, are still in the Kafka topic. However, things get more danger when the number of messages is too much. Some of them will be lost when the retention policy is met (Note that Kafka retention policy can be time-based, partition size-based, key-based). And more important, when our notification module falls very far behind the income notifications/messages, it is not a true “notification” module anymore.

It’s time to think about the multi-thread consumer model.

2. Multi-threaded Apache Kafka consumer model

There are 2 possible models I’d like to mention in this post.

  • Multiple consumers with their own threads (Model #1)
  • Single consumer, multiple worker processing threads (Model #2)

Both of them have their own pros and cons

2.1 Model #1. Multiple consumers with their own threads

Multi-threaded Apache Kafka Consumer - Multiple consumers with their own threads

Multiple consumers with their own threads

Pros Cons
Easy to implement  The total of consumers is limited the total partitions of the topic. The redundant consumers may not be used.
Implementing in-order processing on per-partition is easier. More TCP connections to the brokers

2.2 Model #2. Single consumer, multiple worker processing threads

Single consumer, multiple worker processing threads

Single consumer, multiple worker processing threads

 

Pros Cons
Be flexible in scale out the number of processing thread It’s not easy to implement in-order processing on per partition. Let’s say there are 2 messages on the same partitions being processed by 2 different threads. To guarantee the order, those 2 threads must be coordinated somehow.

3. Implementation

Below is the implementation detail of both 2 models.

3.1. Prerequisite

  • Apache Kafka 0.9/0.10 broker installed on local machine or remote. You can refer to this link for setting up.
  • JDK 7/8 installed on your development PC.
  • Eclipse 4 (I am using Eclipse Mars 4.5)
  • Maven 3

3.2 Source Code Structure

The example source code is added in the Github. You can use git to pull the repository to your PC or simple download the zip version of the example and extract to your PC.

After having the source code, you can import the source code into Eclipse and run the test.

To import:

  •  Menu File –> Import –> Maven –> Existing Maven Projects
  • Browse to your source code location
  • Click Finish button to finish the importing

Here is the project structure in my Eclipse:

Create Multi-threaded Apache Kafka Consumer - Source Code

Create Multi-threaded Apache Kafka Consumer – Source Code

The source code includes the implementation for both above models. The package com.howtoprogram.kafka.multipleconsumers contains all source code for the Model #1: Multiple consumers with their own threads and the package com.howtoprogram.kafka.singleconsumer contain all the source code for the Model #2: Single consumer, multiple worker processing threads

3.3 Maven pom.xml

We use the kafka-clients-0.9.0.1 library for this example. The Java compiler 1.8

3.4. Multiple consumers with their own threads (Model #1)

3.4.1 Class Diagram

Multiple consumers with their own threads - Class Diagram

Multiple consumers with their own threads – Class Diagram

The source code for this part includes 4 classes:

NotificationProducerThread.java is a producer thread, produces message to the Kafka brokers

NotificationConsumerThread.java is a consumer thread, consumes message from Kafka brokers

NotificationConsumerGroup.java create a group of NotificationConsumerThread(s)

MultipleConsumersMain.java contains the main method, run the program to produce and consume messages.

3.4.2 NotificationProducerThread.java

When this producer thread runs, it will produces 5 messages to the broker.

3.4.3 NotificationConsumerThread.java

When this consumer thread runs, it will poll the data from the topics or partitions.

3.4.4 NotificationConsumerGroup.java

This class creates a group of consumer threads based on the given parameters:

brokers: The Kafka brokers to which consumers group will connect

groupId: The group id. All consumers on this group will have the same groupId

topic: The topic to which the consumers group will fetch the data

numberOfConsumer: the number of consumers will be created for the group

3.4.5 MultipleConsumersMain.java

This class is the entry point, contain the main method for testing our source code. In this class we will create a producer thread to produces 5 messages to the topic which has 3 partitions:HelloKafkaTopic1.

Then we create a group of 3 consumers on their own thread to consume the message from the HelloKafkaTopic1 topic.

3.4.5 Run the example.

Create a topic HelloKafkaTopic1 with 3 partitions

Open the MultipleConsumersMain.java on the eclipse. Right click –> Run As –> Java Application or use the shortcut: Alt+Shift+x, j to start the main method.

The output on my eclipse is as below:

Produces 3 messages
Sent:Message 0, Partition: 2, Offset: 21
Sent:Message 3, Partition: 2, Offset: 22
Sent:Message 1, Partition: 1, Offset: 24
Sent:Message 4, Partition: 1, Offset: 25
Sent:Message 2, Partition: 0, Offset: 21
Receive message: Message 2, Partition: 0, Offset: 21, by ThreadID: 13
Receive message: Message 1, Partition: 1, Offset: 24, by ThreadID: 14
Receive message: Message 4, Partition: 1, Offset: 25, by ThreadID: 14
Receive message: Message 0, Partition: 2, Offset: 21, by ThreadID: 15
Receive message: Message 3, Partition: 2, Offset: 22, by ThreadID: 15

Note that the HelloKafkaTopci1 has 3 partitions. Kafka producer client assigned the Message 0,3 to the partition #2, the Messages 1,4 to the partition #1 and the Message 2 to the partition 0.

The consumer group which includes 3 consumers with their own threads with ThreadID(s): 13, 14, 15. All the messages of the partition #0 were consumed by the consumer thread: #13. The messages of partition #1 were consumed by the consumer thread #1. And messages of partition #2 were consumed by the consumer thread #15.

Note that you may get the different partition numbers and ThreadIDs. However, in this case, each partition will be handled by each consumer thread.

3.5 Model #2: Single consumer, multiple worker processing threads

3.5.1 Class Diagram

 Single consumer, multiple worker processing threads - Class Diagram

Single consumer, multiple worker processing threads – Class Diagram

The source code for this part includes 4 classes too:

NotificationProducerThread.java is a producer thread, produces message to the Kafka brokers.

NotificationConsumer.java is a consumer which has a pool of background threads, receives message from the topic and dispatch to the the pool.

ConsumerThreadHandler.java is a kind of worker thread which handle business processing for the message which is dispatched from the NotificationConsumer.

SingleConsumerMain.java. includes the main method, run the program to produce and consume messages.

 

3.5.2  NotificationProducerThread.java

This is the  producer thread which will produces 5 messages to the broker

3.5.3  ConsumerThreadHandler.java

This thread processes the message dispatched from the consumer. In this example, it simply print out the messages, offsets on the topic and the current ThreadID.

3.5.4 NotificationConsumer.java

This class contains a threadpool of ConsumerThreadHandler thread. It receives the message from the topic and dispatch the processing handler for the pool.

3.5.5 SingleConsumerMain.java

The entry point, contains the main method to run the example. It create a NotificationProducerThread thread which produces 5 messages to the topic: HelloKafkaTopic. Then, it creates a NotificationConsumer object which will receive the message from the above topic and dispatch to the pool of 3 ConsumerThreadHandler thread for processing.

3.5.6. Run the example.

You can open the SingleConsumerMain.java on the eclipse. Right click –> Run As –> Java Application or use the shortcut: Alt+Shift+x, j to start the main method.

The output on my eclipse is as below:

Produces 5 messages
Sent:Message 0, Offset: 2602
Sent:Message 1, Offset: 2603
Process: Message 0, Offset: 2602, By ThreadID: 13
Process: Message 1, Offset: 2603, By ThreadID: 14
Sent:Message 2, Offset: 2604
Process: Message 2, Offset: 2604, By ThreadID: 15
Sent:Message 3, Offset: 2605
Process: Message 3, Offset: 2605, By ThreadID: 13
Sent:Message 4, Offset: 2606
Process: Message 4, Offset: 2606, By ThreadID: 14

The producer produces 5 messages with offsets from 2602 ~ 2606. Those messages were processed a pool of threads with Ids: 13, 14, 15.

Note that you may get the different Offsets, ThreadID(s).

4 Conclusion

We have taken a look at how to create multi-threaded Apache Kafka consumer with 2 possible models. They have their own pros and cons and depend on the specific circumstance we will decide which one is suitable. Maybe, there are some cases which the model #2 is suitable. In this case, each partition of a topic will be handled by each consumer thread. However, if the number messages for this partition is too much and the consumer fall far behind, we may need to combine both the model #1 and model #2.

Below are the articles related to Apache Kafka topic. If you’re interested in them, you can refer to the following links:

Apache Kafka Tutorial

Getting started with Apache Kafka 0.9

Using Apache Kafka Docker

Apache Kafka 0.9 Java Client API Example

Apache Kafka Command Line Interface

How To Write A Custom Serializer in Apache Kafka

Write An Apache Kafka Custom Partitioner

Apache Kafka Connect Example

Apache Kafka Command Line Interface

Apache Flume Kafka Source And HDFS Sink Tutorial

0 0 vote
Article Rating