In this article, I’d like to show you how to create a producer and consumer by using Apache Kafka Java client API.

1. Overview

Apache Kafka has some built-in client tools to produce and consume messages against Apache Kafka broker. However, in term of messaging, both of them are mainly used for simple operations at server (broker) side where you may only need to:

  • Create Kafka Topic
  • List all topics exist in the brokers.
  • Produce and consume some tests data against some topics in the cluster.

In similar to many other messaging systems, Apache Kafka provides many types of client APIs in different languages such as Java, Python, Ruby, Go..which will facilitate users in working with Kafka clusters. In this article, we will focus on Java Client API.

2. Prerequisites

3. Project structure

The example source code is available on my Github project. After having the source code, we can import the source code into Eclipse and run the test.

To import:

  •  Menu File –> Import –> Maven –> Existing Maven Projects
  • Browse to your source code location
  • Click Finish button to finish the importing

Here is the project structure in my Eclipse:

Apache Kafka 0.9 Java Client API - Project structure

Java project structure

4. Maven pom.xml file

We use the Kafka-clients-0.9.0.1 library for this example. The Java compiler 1.8

5. Source Code

5.1 Producer source code

To create a producer, we just need to create a new instance of the class KakfaProducer and provide it a list of properties to initialize the instance:
Below are some essential properties used in this article:

bootstrap.servers

bootstrap.servers is the IP addresses of Kafka cluster. If you have more than 1 broker, you can put all separated by commas. For ex: 192.168.33.10:9092, 192.168.33.10:9093

Currently, my broker is installed on another PC which has IP address is 192.168.33.10 and listened on the port 9092. If it’s different with your environment, you should change that info to be matched with yours.

key.serializer and value.serializer.

Kafka message sent to Kafka cluster is combined with key(optional) and value which can be any data type. However, we will need to specify how Kafka producer should serialize those data types into binary before sending to Kafka cluster.

In this example, we will produce text messages to Kafka cluster. Therefore, we use StringSerializer which is a built-in serializer of Kafka client to serialize strings into binary.

Produce message to the Kafka cluster.

Above source code will produce 100 messages which value are “Message 1”, “Message 2”,… “Message 99” to the HelloKafkaTopic topic.

5.2. Consumer source code

To create Kafka consumer, we also just need to create an instance of KafkaConsumer class and provide it a list of properties.

Some essential properties for the consumer include:

bootstrap.servers

The Kafka cluster IP addresses. In the same with Producer, we can put a list of brokers IP addresses, ports separated by commas in here.

group.id

It is the group id of processes which the consumer belonged to.

key.deserializer and value.deserializer

They are deserializers used by Kafka consumer to deserialize the binary data received from Kafka cluster to our desire data types. In this example, because the producer produces string message, our consumer use StringDeserializer which is a  built-in deserializer of Kafka client API to deserialize the binary data to the string.

Our consumer subscribes the topic: HelloKafkaTopic 

Consume messages from the cluster

Our consumer polls the clusters for the new messages. The method takes a timeout =100 milliseconds which is the time the consumers wait if there is no message from the cluster. Once found the messages, we will print their offsets, values line by line.

6. Execute the source code

6.1. ProducerTest.java

From the Eclipse:

Open the ProducerTest.java

Right-click –> Run as –> Java Application

You will see the output on the console as below:

Apache Kafka 0.9 Java Client API - Execute ProducerTest.java

Execute ProducerTest.java

 

6.2. ConsumerTest.java

From the Eclipse:

Open the ConsumerTest.java

Right click –> Run as –> Java Application

You will see the output on the console as below:

 

Apache Kafka 0.9 Java Client API - Execute ConsumerTest.java

Executes ConsumerTest.java

7. Conclusion

This article shows you how to create a very simple producer which produces string messages and a consumer to consume those messages. Both are implemented with Apache Kafka 0.9 Java client API. This example is very basic and simple. I hope it will help those who want to look for some basic tutorial to getting started with Apache Kafka especially version 1.0.

Below are the articles related to Apache Kafka topic. If you’re interested in them, you can refer to the following links:

Apache Kafka Tutorial

Getting started with Apache Kafka 0.9

Apache Kafka Command Line Interface

Create Multi-threaded Apache Kafka Consumer

Using Apache Kafka Docker

Apache Kafka Command Line Interface

Apache Kafka Connect Example

Apache Kafka Connect MQTT Source Tutorial

Apache Flume Kafka Source And HDFS Sink Tutorial

Spring Kafka Tutorial

 

0 0 vote
Article Rating