As mentioned in previous article, Spring Kafka provides two MessageListenerContainer implementations: KafkaMessageListenerContainer and ConcurrentMessageListenerContainer. The KafkaMessageListenerContainer provides ability to consume messages from Kafka topics in a single thread while the ConcurrentMessageListenerContainer allows us to consume messages in multi-threaded style. In this article, we will get to know more detail about multi-threaded message consumption by using the second implementation, the ConcurrentMessageListenerContainer.
1. ConcurrentMessageListenerContainer
Here is the general class diagram of the MessageListenerContainer and its children.
1.1. KafkaMessageListenerContainer
Firstly, let’s take a quick look at the KafkaMessageListenerContainer class. Here is one of its constructor:
1 2 |
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) |
The constructor require a ConsumerFactory object which includes information to create Apache Kafka consumer and a ContainerProperties object which includes information about the topics consumed by the consumer.
The second constructor is:
1 2 3 |
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionInitialOffset... topicPartitions) |
This constructor is used by the ConcurrentMessageListenerContainer and requires information about partition and offset of the topic.
1.2. ConcurrentMessageListenerContainer
From the class diagram, we can see that the ConcurrentMessageListenerContainer has a property concurrency that represents the number of concurrent message containers (Apache Kafka consumers) to consume the messages from Apache Kafka topic. We also see the relationship between it and the KafkaMessageListenerContainer. So, if the number concurrency is 3, then it will create 3 KafkaMessageListenerContainer objects.
One important thing we should remember when we work with Apache Kafka is the number of consumers in the same consumer group should be less than or equal the number of partitions in the consumed topic. Otherwise, the exceedable consumers will not be received any messages from the topic.
With Spring Kafka, if the concurrency is greater than number of the topic partitions, then the concurrency will be will be adjusted down such that each container will get one partition.
If we have 3 partitions and the concurrency is 3, then each container will get 1 partition. If we have 4 partitions, then 1 container will get 2, and the 2 remain ones will get 1 partition for each.
2. Multi-threaded Message Consumption Example
Here is the detail of the example.
- We have a topic: SpringKafkaTopic with 3 partitions.
- A ConcurrentMessageListenerContainer object with the concurrency = 3
- A KafkaTemplate, provided by Spring Kafka, to send the messages to the topic.
2.1. Preparations
- Apache Kafka 0.9/0.10 broker installed on local or remote. You can refer to this link for setting up.
- JDK 8 installed on your development PC.
- IDE (Eclipse or IntelliJ)
- Maven
2.2. Source Code Structure
The source code was added to the Github. You can clone from it or download the zip file. The zip file contains multiple examples of Spring Kafka. For this example, check the spring-kafka-multi-threaded-consumption sub project.
To import in to Eclipse.
- Menu File –> Import –> Maven –> Existing Maven Projects
- Browse to your source code location
Click Finish button to finish the importing
2.3. Spring Kafka dependencies
The only dependency we will need is spring-kafka
1 2 3 4 5 |
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.0.RELEASE</version> </dependency> |
However, this example is built on Spring Boot, and we also need to run JUnit tests, the dependencies are more as following:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.howtoprogram</groupId> <artifactId>spring-kafka-example1</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-kafka-example1</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>1.1.0.RELEASE</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> <scope>compile</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Brixton.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> |
2.4. Classes Descriptions
We will separate the configuration of the consumer and producer into different classes for easy to get.
2.4.1. KafkaConsumerConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
package com.howtoprogram.kafka; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; @Configuration @EnableKafka public class KafkaConsumerConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propsMap; } @Bean public Listener listener() { return new Listener(); } } |
Note that we have configured a listener bean. Let’s take a look at it in the next section.
2.4.2. Listener.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
package com.howtoprogram.kafka; import java.util.concurrent.CountDownLatch; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.TopicPartition; public class Listener { public CountDownLatch countDownLatch0 = new CountDownLatch(3); public CountDownLatch countDownLatch1 = new CountDownLatch(3); public CountDownLatch countDownLatch2 = new CountDownLatch(3); @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic", partitions = { "0" }) }) public void listenPartition0(ConsumerRecord<?, ?> record) { System.out.println("Listener Id0, Thread ID: " + Thread.currentThread().getId()); System.out.println("Received: " + record); countDownLatch0.countDown(); } @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic", partitions = { "1" }) }) public void listenPartition1(ConsumerRecord<?, ?> record) { System.out.println("Listener Id1, Thread ID: " + Thread.currentThread().getId()); System.out.println("Received: " + record); countDownLatch1.countDown(); } @KafkaListener(id = "id2", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic", partitions = { "2" }) }) public void listenPartition2(ConsumerRecord<?, ?> record) { System.out.println("Listener Id2, Thread ID: " + Thread.currentThread().getId()); System.out.println("Received: " + record); countDownLatch2.countDown(); } } |
2.4.3. KafkaProducerConfig.java
This class define producer configuration which use KafkaTemplate to produce the messages to the topic.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
package com.howtoprogram.kafka; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; @Configuration @EnableKafka public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } } |
2.4.4. kafka-topics.sh
The script to create the topic SpringKafkaTopic with partitions. Note that if we don’t create the topic, it will be created automatically but with only 1 partition, and we can not test correctly this example.
1 2 |
#cd $APACHE_KAFKA_HOME ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic SpringKafkaTopic |
The kafka-topics.bat is used to create the topic on Windows environment.
2.4.5. SpringKafkaMultipleConsumptionTests
The unit test class of our example.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
@RunWith(SpringRunner.class) @SpringBootTest public class SpringKafkaMultipleConsumptionTests { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private Listener listener; @Test public void contextLoads() throws InterruptedException { for (int i = 0; i < 9; i++) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("SpringKafkaTopic", "Messsage:" + i); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { System.out.println("Sent message: " + result); } @Override public void onFailure(Throwable ex) { System.out.println("Failed to send message"); } }); } assertThat(this.listener.countDownLatch0.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.listener.countDownLatch1.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.listener.countDownLatch2.await(60, TimeUnit.SECONDS)).isTrue(); } } |
We have injected the kafkaTemplate to send 9 messages to the topic SpringKafkaTopic, and we expect that each of 3 container will receive 3 messages.
2.4.6 Run the example
Step 1. Make sure the Apache Kafka is running on localhost:9092
Step 2. Run the kafka-topics.sh or kafka-topics.bat to create the topic with 3 partitions.
Step 3. Open the SpringKafkaMultipleConsumptionTests.java, Right click –> Run As –> JUnit Test or use the shortcut: Alt+Shift+x, t to start the test.
The output on the console should be similar to:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
Listener Id2, Thread ID: 17 Received: ConsumerRecord(topic = SpringKafkaTopic, partition = 2, offset = 6, CreateTime = 1474782637845, checksum = 386545754, serialized key size = -1, serialized value size = 10, key = null, value = Messsage:1) Listener Id1, Thread ID: 18 Received: ConsumerRecord(topic = SpringKafkaTopic, partition = 1, offset = 6, CreateTime = 1474782637845, checksum = 2382588384, serialized key size = -1, serialized value size = 10, key = null, value = Messsage:2) Listener Id2, Thread ID: 17 Received: ConsumerRecord(topic = SpringKafkaTopic, partition = 2, offset = 7, CreateTime = 1474782637845, checksum = 1734397141, serialized key size = -1, serialized value size = 10, key = null, value = Messsage:4) Listener Id0, Thread ID: 15 Received: ConsumerRecord(topic = SpringKafkaTopic, partition = 0, offset = 6, CreateTime = 1474782637839, checksum = 240789682, serialized key size = -1, serialized value size = 10, key = null, value = Messsage:0) Listener Id2, Thread ID: 17 Received: ConsumerRecord(topic = SpringKafkaTopic, partition = 2, offset = 8, CreateTime = 1474782637846, checksum = 1564416966, serialized key size = -1, serialized value size = 10, key = null, value = Messsage:7) Listener Id1, Thread ID: 18 Received: ConsumerRecord(topic = SpringKafkaTopic, partition = 1, offset = 7, CreateTime = 1474782637845, checksum = 275250243, serialized key size = -1, serialized value size = 10, key = null, value = Messsage:5) Listener Id0, Thread ID: 15 Received: ConsumerRecord(topic = SpringKafkaTopic, partition = 0, offset = 7, CreateTime = 1474782637845, checksum = 4177811830, serialized key size = -1, serialized value size = 10, key = null, value = Messsage:3) Listener Id0, Thread ID: 15 Received: ConsumerRecord(topic = SpringKafkaTopic, partition = 0, offset = 8, CreateTime = 1474782637846, checksum = 708324176, serialized key size = -1, serialized value size = 10, key = null, value = Messsage:6) Listener Id1, Thread ID: 18 Received: ConsumerRecord(topic = SpringKafkaTopic, partition = 1, offset = 8, CreateTime = 1474782637846, checksum = 3447719511, serialized key size = -1, serialized value size = 10, key = null, value = Messsage:8) |
3. Summary
We have learned about multi-threaded message consumption by using ConcurrentMessageListenerContainer in the Spring for Apache Kafka. This wrapper of Spring Kafka facilitates the using of multi-threaded consumer model in Apache Kafka which improve the performance in message consumer. If you’re looking for the native approaches, you can refer to my previous post: Create Multi-threaded Apache Kafka Consumer
Below are other articles related to the Apache Kafka for your reference:
Spring Kafka Tutorial – Getting Started with Spring for Apache Kafka
Getting started with Apache Kafka 0.9
Apache Kafka 0.9 Java Client API Example
Create Multi-threaded Apache Kafka Consumer
Apache Kafka Command Line Interface
Write An Apache Kafka Custom Partitioner
How To Write A Custom Serializer in Apache Kafka
Apache Kafka Connect MQTT Source Tutorial
Apache Flume Kafka Source And HDFS Sink Tutorial
Thanks for the article!! Will you also be able to provide an example of using KafkaMessageListenerContainer with manual commit? I am having a tough time trying to figure out a way to do that with ‘spring-integration-kafka:2.0.0.RELEASE’. Also, hope this makes sense to ask, for both concurrent/single thread ListenContainer config: What would you recommend the below configuration to be if you have to process a high number of message(let’s say 2000/min) and each takes about 5sec to process: 1) Heartbeat interval ms config 2) session timeout ms config 3) auto commit interval ms config(if I set enable auto commit to be… Read more »