As mentioned in previous article, Spring Kafka provides two MessageListenerContainer implementations: KafkaMessageListenerContainer and ConcurrentMessageListenerContainer. The KafkaMessageListenerContainer provides ability to consume messages from Kafka topics in a single thread while the ConcurrentMessageListenerContainer allows us to consume messages in multi-threaded style. In this article, we will get to know more detail about multi-threaded message consumption by using the second implementation, the ConcurrentMessageListenerContainer.

1. ConcurrentMessageListenerContainer

Here is the general class diagram of the MessageListenerContainer and its children.

Spring Kafka Multi-threaded Message Consumption - MessageListener Class Diagram

MessageListener Class Diagram

 

1.1. KafkaMessageListenerContainer

Firstly, let’s take a quick look at the KafkaMessageListenerContainer class. Here is one of its constructor:

The constructor require a ConsumerFactory object which includes information to create Apache Kafka consumer and a ContainerProperties object which includes information about the topics consumed by the consumer.

The second constructor is:

This constructor is used by the ConcurrentMessageListenerContainer and requires information about partition and offset of the topic.

1.2. ConcurrentMessageListenerContainer

From the class diagram, we can see that the ConcurrentMessageListenerContainer has a property concurrency that represents the number of concurrent message containers (Apache Kafka consumers) to consume the messages from Apache Kafka topic. We also see the relationship between it and the KafkaMessageListenerContainer. So, if the number concurrency is 3, then it will create 3 KafkaMessageListenerContainer objects.

One important thing we should remember when we work with Apache Kafka is the number of consumers in the same consumer group should be less than or equal the number of partitions in the consumed topic. Otherwise, the exceedable consumers will not be received any messages from the topic.

With Spring Kafka, if the concurrency is greater than number of the topic partitions, then the concurrency will be  will be adjusted down such that each container will get one partition.

If we have 3 partitions and the concurrency is 3, then each container will get 1 partition. If we have 4 partitions, then 1 container will get 2, and the 2 remain ones will get 1 partition for each.

2. Multi-threaded Message Consumption Example

Here is the detail of the example.

  • We have a topic: SpringKafkaTopic with 3 partitions.
  • A ConcurrentMessageListenerContainer object with the concurrency = 3
  • A KafkaTemplate, provided by Spring Kafka, to send the messages to the topic.

2.1. Preparations

  • Apache Kafka 0.9/0.10 broker installed on local or remote. You can refer to this link for setting up.
  • JDK 8 installed on your development PC.
  • IDE (Eclipse or IntelliJ)
  • Maven

2.2. Source Code Structure

The source code was added to the Github. You can clone from it or download the zip file. The zip file contains multiple examples of Spring Kafka. For this example, check the spring-kafka-multi-threaded-consumption sub project.

To import in to Eclipse.

  • Menu File –> Import –> Maven –> Existing Maven Projects
  • Browse to your source code location

Click Finish button to finish the importing

2.3. Spring Kafka dependencies

The only dependency we will need is spring-kafka

However, this example is built on Spring Boot, and we also need to run JUnit tests, the dependencies are more as following:

2.4. Classes Descriptions

We will separate the configuration of the consumer and producer into different classes for easy to get.

2.4.1. KafkaConsumerConfig

We have specified the configuration for the consumer in the consumerConfigs() method. The concurrency=3 property is set in the kafkaListenerContainerFactory() method.

Note that we have configured a listener bean. Let’s take a look at it in the next section.

2.4.2. Listener.java

We have used @KafkaListener to define the listeners for different partitions of the topic SpringKafkaTopic. The countDownLatch properties are used mainly for testing purpose.

2.4.3. KafkaProducerConfig.java

This class define producer configuration which use KafkaTemplate to produce the messages to the topic.

2.4.4. kafka-topics.sh

The script to create the topic SpringKafkaTopic with partitions. Note that if we don’t create the topic, it will be created automatically but with only 1 partition, and we can not test correctly this example.

The kafka-topics.bat is used to create the topic on Windows environment.

2.4.5. SpringKafkaMultipleConsumptionTests

The unit test class of our example.

We have injected the kafkaTemplate to send 9 messages to the topic SpringKafkaTopic, and we expect that each of 3 container will receive 3 messages.

2.4.6 Run the example

Step 1. Make sure the Apache Kafka is running on localhost:9092

Step 2. Run the  kafka-topics.sh or  kafka-topics.bat to create the topic with 3 partitions.

Step 3. Open the SpringKafkaMultipleConsumptionTests.java, Right click –> Run As –> JUnit Test or use the shortcut: Alt+Shift+x, t to start the test.

The output on the console should be similar to:

3. Summary

We have learned about multi-threaded message consumption by using ConcurrentMessageListenerContainer in the Spring for Apache Kafka. This wrapper of Spring Kafka facilitates the using of multi-threaded consumer model in Apache Kafka which improve the performance in message consumer. If you’re looking for the native approaches, you can refer to my previous post: Create Multi-threaded Apache Kafka Consumer

Below are other articles related to the Apache Kafka for your reference:

Spring Kafka Tutorial – Getting Started with Spring for Apache Kafka

Apache Kafka Tutorial

Getting started with Apache Kafka 0.9

Apache Kafka 0.9 Java Client API Example

Using Apache Kafka Docker

Create Multi-threaded Apache Kafka Consumer

Apache Kafka Command Line Interface

Write An Apache Kafka Custom Partitioner

How To Write A Custom Serializer in Apache Kafka

Apache Kafka Connect Example

Apache Kafka Connect MQTT Source Tutorial

Apache Flume Kafka Source And HDFS Sink Tutorial

 

5 1 vote
Article Rating