In this article, I’d like to show you how to create a producer and consumer by using Apache Kafka Java client API.
1. Overview
Apache Kafka has some built-in client tools to produce and consume messages against Apache Kafka broker. However, in term of messaging, both of them are mainly used for simple operations at server (broker) side where you may only need to:
- Create Kafka Topic
- List all topics exist in the brokers.
- Produce and consume some tests data against some topics in the cluster.
In similar to many other messaging systems, Apache Kafka provides many types of client APIs in different languages such as Java, Python, Ruby, Go..which will facilitate users in working with Kafka clusters. In this article, we will focus on Java Client API.
2. Prerequisites
- Apache Kafka single broker installed on local machine or remote. If it’s not ready, you can install by yourself by taking a look at install Apache Kafka on Linux or install Apache Kafka on Windows.
- JDK 7/8 installed on your development PC.
- Eclipse 4 (I am using Eclipse Mars 4.5)
- Maven 3
3. Project structure
The example source code is available on my Github project. After having the source code, we can import the source code into Eclipse and run the test.
To import:
- Menu File –> Import –> Maven –> Existing Maven Projects
- Browse to your source code location
- Click Finish button to finish the importing
Here is the project structure in my Eclipse:
4. Maven pom.xml file
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.howtoprogram</groupId> <artifactId>kafka-java-example</artifactId> <version>0.0.1-SNAPSHOT</version> <name>Kafka-Java-Example</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> <scope>provided</scope> </dependency> </dependencies> </project> |
5. Source Code
5.1 Producer source code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
package com.howtoprogram.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class ProducerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.33.10:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = null; try { producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { String msg = "Message " + i; producer.send(new ProducerRecord<String, String>("HelloKafkaTopic", msg)); System.out.println("Sent:" + msg); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } } |
1 |
producer = new KafkaProducer<>(props); |
bootstrap.servers
1 |
props.put("bootstrap.servers", "192.168.33.10:9092"); |
bootstrap.servers is the IP addresses of Kafka cluster. If you have more than 1 broker, you can put all separated by commas. For ex: 192.168.33.10:9092, 192.168.33.10:9093
Currently, my broker is installed on another PC which has IP address is 192.168.33.10 and listened on the port 9092. If it’s different with your environment, you should change that info to be matched with yours.
key.serializer and value.serializer.
1 2 |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
Kafka message sent to Kafka cluster is combined with key(optional) and value which can be any data type. However, we will need to specify how Kafka producer should serialize those data types into binary before sending to Kafka cluster.
In this example, we will produce text messages to Kafka cluster. Therefore, we use StringSerializer which is a built-in serializer of Kafka client to serialize strings into binary.
Produce message to the Kafka cluster.
1 2 3 4 5 |
for (int i = 0; i < 100; i++) { String msg = "Message " + i; producer.send(new ProducerRecord<String, String>("HelloKafkaTopic", msg)); System.out.println("Sent:" + msg); } |
Above source code will produce 100 messages which value are “Message 1”, “Message 2”,… “Message 99” to the HelloKafkaTopic topic.
5.2. Consumer source code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
package com.howtoprogram.kafka; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class ConsumerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.33.10:9092"); props.put("group.id", "group-1"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); kafkaConsumer.subscribe(Arrays.asList("HelloKafkaTopic")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } } } } |
Some essential properties for the consumer include:
bootstrap.servers
The Kafka cluster IP addresses. In the same with Producer, we can put a list of brokers IP addresses, ports separated by commas in here.
group.id
It is the group id of processes which the consumer belonged to.
key.deserializer and value.deserializer
They are deserializers used by Kafka consumer to deserialize the binary data received from Kafka cluster to our desire data types. In this example, because the producer produces string message, our consumer use StringDeserializer which is a built-in deserializer of Kafka client API to deserialize the binary data to the string.
Our consumer subscribes the topic: HelloKafkaTopic
1 |
kafkaConsumer.subscribe(Arrays.asList("HelloKafkaTopic")); |
Consume messages from the cluster
1 2 3 4 5 6 7 |
while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } } |
Our consumer polls the clusters for the new messages. The method takes a timeout =100 milliseconds which is the time the consumers wait if there is no message from the cluster. Once found the messages, we will print their offsets, values line by line.
6. Execute the source code
6.1. ProducerTest.java
From the Eclipse:
Open the ProducerTest.java
Right-click –> Run as –> Java Application
You will see the output on the console as below:
6.2. ConsumerTest.java
From the Eclipse:
Open the ConsumerTest.java
Right click –> Run as –> Java Application
You will see the output on the console as below:
7. Conclusion
This article shows you how to create a very simple producer which produces string messages and a consumer to consume those messages. Both are implemented with Apache Kafka 0.9 Java client API. This example is very basic and simple. I hope it will help those who want to look for some basic tutorial to getting started with Apache Kafka especially version 1.0.
Below are the articles related to Apache Kafka topic. If you’re interested in them, you can refer to the following links:
Getting started with Apache Kafka 0.9
Apache Kafka Command Line Interface
Create Multi-threaded Apache Kafka Consumer
Apache Kafka Command Line Interface
Apache Kafka Connect MQTT Source Tutorial
Apache Flume Kafka Source And HDFS Sink Tutorial
Thank you for the sharing. I’m going to try it by copying your code 🙂
-John Lee
Thank you, very useful and clear article. With it I quickly run my first kafka project.