Apache Kafka, a distributed messaging system, is gaining very much attraction today. Spring is a very popular framework for Java developer. Getting Apache Kafka to work with Spring smoothly will be a very good thing for many Java developers. In this Spring Kafka tutorial, we will get to know about Spring Kafka, the Spring for Kafka, how to use KafkaTemplate to produce messages to Kafka brokers, and how to use “listener container” to consume messages from Kafka as well.

Spring Kafka
1. Basic Spring Kafka
In this section, we will get through some various components that comprise the Spring Kafka
1.1. Sending Messages
In the same style with JmsTemplate or JdbcTemplate, Spring Kafka provides us a “template” for Kafka called KafkaTemplate. It wraps a Kafka producer and provides us many convenience methods to send messages to Kafka brokers. Below are some of those methods
1 2 3 4 5 6 7 8 9 |
ListenableFuture<SendResult<K, V>> send(String topic, V data); ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data); ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data); ListenableFuture<SendResult<K, V>> send(Message<?> message); |
You can find more information about the template here.
1.2. Receiving Messages
To receive the messages, we have to:
- Configure the MessageListenerContainer
- Provide a Message Listener, or use the @KafkaListener annotation
1.2.1. MessageListenserContainer
There are 2 implementation of the MessageListenserContainer in Spring Kafka:
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer allows us to consume messages from Kafka topics in a single thread while the ConcurrentMessageListenerContainer allows us to consume messages in multi-threaded style.
1.2.1. @KafkaListener annotation
Spring Kafka provides @KafkaListener annotation marks a method to be the target of a Kafka message listener on the specified topics, for example:
1 2 3 4 5 6 7 |
public class Listener { @KafkaListener(id = "id01", topics = "Topic1") public void listen(String data) { } } |
2. Spring Kafka Example
In this part of the Spring Kafka tutorial, we will get through an example which use Spring Kafka API to send and receive messages to/from Kafka topics.
2.1. Preparation
- Apache Kafka 0.9/0.10 broker installed on local machine or remote. You can refer to this link for setting up.
- JDK 7/8 installed on your development PC.
- IDE (Eclipse or IntelliJ)
- Build tool (Maven or Gradle)
2.2. Source Code Structure
The source code was added to the Github or you can download it here
After having the source code, you can import the source code into Eclipse and run the test.
To import:
- Menu File –> Import –> Maven –> Existing Maven Projects
- Browse to your source code location
Click Finish button to finish the importing
2.3. Library dependencies
The only dependency we need is spring-kafka
1 2 3 4 5 |
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.0.RELEASE</version> </dependency> |
However, this example is built on Spring Boot, and we also need to run JUnit tests, the dependencies are more as following:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.howtoprogram</groupId> <artifactId>spring-kafka-example</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-kafka-example</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>1.1.0.RELEASE</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> <scope>compile</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Brixton.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> |
2.4. Classes Descriptions
2.4.1. SpringKafkaExampleApplication.java
Entry point for Spring Boot application.
1 2 3 4 5 6 7 8 9 |
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringKafkaExampleApplication { public static void main(String[] args) { SpringApplication.run(SpringKafkaExampleApplication.class, args); } } |
2.4.2. ProducerConfig.java
This class contains configuration for KafkaTemplate. We need to specify the properties for the Kafka producer.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
@Configuration @EnableKafka public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } } |
2.4.3. KafkaConsumerConfig
Defines the configuration to consume messages from Kafka broker.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
@Configuration @EnableKafka public class KafkaConsumerConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propsMap; } @Bean public Listener listener() { return new Listener(); } } |
We need to define the KafkaListenerContainerFactory which will create KafkaListenerContainer. In this example, we will use ConcurrentKafkaListenerContainer which can consume messages in multi-threaded style.
Note that the broker address is: localhost:9092
We also define a Listener which has a method annotated with @KafkaListener annotation to “listen” and process messages.
1 2 3 4 5 6 7 8 9 10 11 |
public class Listener { public final CountDownLatch countDownLatch1 = new CountDownLatch(1); @KafkaListener(id = "foo", topics = "topic1", group = "group1") public void listen(ConsumerRecord<?, ?> record) { System.out.println(record); countDownLatch1.countDown(); } } |
This listener will have id = foo and group = group1 and will consume the message on the topic “topic1“. It also has a countDownLatch1 variable used for unit test purpose below.
2.4.4. SpringKafkaExampleApplicationTests
Unit test class to test our example.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
@RunWith(SpringRunner.class) @SpringBootTest public class SpringKafkaExampleApplicationTests { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private Listener listener; @Test public void contextLoads() throws InterruptedException { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic1", "ABC"); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { System.out.println("success"); } @Override public void onFailure(Throwable ex) { System.out.println("failed"); } }); System.out.println(Thread.currentThread().getId()); assertThat(this.listener.countDownLatch1.await(60, TimeUnit.SECONDS)).isTrue(); } } |
To send the messages to the Kafka topic, we inject the kafkaTemplate bean(@autowire). We also inject the listener (@autowire) to verify the result.
We register a ListenableFutureCallback with the kafkaTemplate to verify whether the messages are sent to the topic “Topic1” successfully or not.
2.4.5 Run the application
Step 1. Make sure the Kafka broker is running on localhost:9092
Step 2. Open the SpringKafkaExampleApplicationTests.java, Right click –> Run As –> JUnit Test or use the shortcut: Alt+Shift+x, t to start the test.
3. Summary
We have just gotten through a Spring Kafka tutorial. Spring Kafka supports us in integrating Kafka with our Spring application easily and a simple example as well. In future posts, I’s like to provide more examples on using Spring Kafka such as: multi-threaded consumers, multiple KafkaListenerContainerFactory, etc. Recently, I have some more article on Apache Kafka. If you’re interested in, you can refer to the following links:
Getting started with Apache Kafka 0.9
Create Multi-threaded Apache Kafka Consumer
Apache Kafka Command Line Interface
Write An Apache Kafka Custom Partitioner
How To Write A Custom Serializer in Apache Kafka
Apache Kafka Connect MQTT Source Tutorial
Apache Flume Kafka Source And HDFS Sink Tutorial
Spring Kafka – Multi-threaded Message Consumption
My producer is working fine, but I can’t get the consumer to work! When I send messages to my topic from the console the listen() method is not invoked.
I have the same problem as Meme, thinking there is a piece missing –The producer produces / Template sends, but not seeing evidence that any of the 3 listeners are hearing the messages from the queue (using concurrency 3 in the settings as per example)
Sorry MEME, I have just seen your comment.
@Needit 2work:): Can you download my example source code, import into Eclipse and run the tests. I have just tried and the consumer works.
One note is there are 3 containers (concurrency) of the same consumer group (group1). So, if your topic has 3 partitions, then 3 containers will be assigned with each paritions. If your topic has only one parition, then only one container is assigned to listen that topic.
Let’s me know if you have any further questions.
Thanks,
More notes: Using my kafka distribution’s kafka-console-consumer.sh script I can see produced messages, plus of course the callback success in the jUnit test. I’ve added an eventListener to the Listener to detect idleness and listeners will definitely report idle, so they are there…
Thank you