To continue the series about Apache Kafka, I’d like to share how to write a custom serializer in  Apache Kafka.

1. Why we need a custom serializer in Apache Kafka?

Apache Kafka allows us to send the messages with different data types to the broker. The messages can be string, number, array,…and any type of objects. And serializers are used in preparing the message for transmission from the producer to broker. More concise, they are used to let the producer know how to convert the message into byte array before transmitting the entire message to the broker. And on the contrary, the deserializers are used by consumer to convert the byte array back to the object.

Apache Kafka Serializer and Deserializer

Apache Kafka Serializer and Deserializer

2. How to write a custom serializer in Apache Kafka.

2.1. Prerequisites

  • Apache Kafka 0.9 broker installed on local machine or remote. You can refer to this link for setting up.
  • JDK 7/8 installed on your development PC.
  • Eclipse 4 (I am using Eclipse Mars 4.5)
  • Maven 3

2.2. Source code structure

The example source code is added in the Github. You can use git to pull the repository to your PC or simple download the zip version of the example and extract to your PC.

After having the source code, you can import the source code into Eclipse and run the test.

To import:

  •  Menu File –> Import –> Maven –> Existing Maven Projects
  • Browse to your source code location
  • Click Finish button to finish the importing

Here is the project structure in my Eclipse:

Kafka Custom Serializer, Deserializer - Source Code

Custom Serializer in Apache Kafka – Source Code

2.3. Maven Pom.xml

We use the kafka-clients-0.9.0.1 library for this example, Java compiler 1.8 and another library which is used for facilitate our serialize, deserialize  processes.

2.4. Classes Descriptions

The source code includes 6 classes:

User. java. Encapsulates user information. Producer will create and send users to broker while consumer will receive the objects from the broker.

UserSerializer.java. Convert the User object to byte array

UserDeserializer.java. Convert the byte array to User object

UserProducerThread.java. The producer class

UserConsumerThread.java. The consumer class

KakfaCustomSerializerMain.java. The entry point, used for testing purpose

2.4.1.User. java

Basic user information. We was overridden the toString() to print out the object content.

2.4.2. UserSerializer.java

We have to implement the Serializer  interface of Apache Kafka client API. The main method we need to implement is the serialize  method. It simply converts the object into byte array and returns. In here, we utilize the ObjectMapper object from the jackson-databind library to quickly serialize the whole object into byte array.

2.4.3. UserDeserializer.java

In similar to the serializer, we have to implement the Deserializer of the Apache Kafka client API. We need to implement the method deserialize. And in here, we also utilize the ObjectMaper object to convert the byte array back to the User object.

2.4.3. UserProducerThread.java

Producer thread.When run, it will send a list of 2 users to the broker.

Important note here is we registered the value.serializer property of the producer with our custom serializer.

2.4.4. UserDeserializer.java

Consumer thread. It will poll the broker for the messages and print out to the console.

Note that we already registered the value.deserializer property of the consumer with our custom deserializer.

2.4.5. KakfaCustomSerializerMain.java

The entry point class which initialize the producer, consumer threads for our testing purpose.

2.4.6 Run the application.

Step 1. Execute the script: create-topics.sh or create-topics.bat (on Windows) to create the UserMessageTopictopic on the Kafka broker.

Step 2. Open the KakfaCustomSerializerMain.java on the eclipse. Right click –> Run As –> Java Application or use the shortcut: Alt+Shift+x, j to start the main method.

The output on my eclipse is as below:

Sent:User [id=1, userName=tom, firstName=Tom, lastName=Riddle, age=40]
Sent:User [id=2, userName=harry, firstName=Harry, lastName=Potter, age=10]
Receive: User [id=1, userName=tom, firstName=Tom, lastName=Riddle, age=40]
Receive: User [id=2, userName=harry, firstName=Harry, lastName=Potter, age=10]

We can see that the users sent by producer were received by the consumer

2.5 Conclusion.

Above is an basic example how to write  a custom serializer in Apache Kafka. The example just implemented the custom serializer/deserializer for the value. You can base on example to implement the custom serializer for the key. Currently, Apache Kafka supports some kinds of serializer/deserializer as below:

  • ByteArray
  • Integer
  • Long
  • String

If there is not enough for your purpose. You can try to implement your own ones. However, before doing that, let’s have a look at some serialization libraries such as: Avro, Apache Thrift or Protocol Buffer

Below are the articles related to Apache Kafka. If you’re interested in them, you can refer to the following links:

Apache Kafka Tutorial

Getting started with Apache Kafka 0.9

Apache Kafka 0.9 Java Client API Example

Using Apache Kafka Docker

Create Multi-threaded Apache Kafka Consumer

Apache Kafka Command Line Interface

Write An Apache Kafka Custom Partitioner

Apache Kafka Connect Example

Apache Kafka Connect MQTT Source Tutorial

Apache Flume Kafka Source And HDFS Sink Tutorial

Spring Kafka Tutorial

0 0 vote
Article Rating