To continue the series about Apache Kafka, I’d like to share how to write a custom serializer in Apache Kafka.
1. Why we need a custom serializer in Apache Kafka?
Apache Kafka allows us to send the messages with different data types to the broker. The messages can be string, number, array,…and any type of objects. And serializers are used in preparing the message for transmission from the producer to broker. More concise, they are used to let the producer know how to convert the message into byte array before transmitting the entire message to the broker. And on the contrary, the deserializers are used by consumer to convert the byte array back to the object.
2. How to write a custom serializer in Apache Kafka.
2.1. Prerequisites
- Apache Kafka 0.9 broker installed on local machine or remote. You can refer to this link for setting up.
- JDK 7/8 installed on your development PC.
- Eclipse 4 (I am using Eclipse Mars 4.5)
- Maven 3
2.2. Source code structure
The example source code is added in the Github. You can use git to pull the repository to your PC or simple download the zip version of the example and extract to your PC.
After having the source code, you can import the source code into Eclipse and run the test.
To import:
- Menu File –> Import –> Maven –> Existing Maven Projects
- Browse to your source code location
- Click Finish button to finish the importing
Here is the project structure in my Eclipse:
2.3. Maven Pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.howtoprogram</groupId> <artifactId>kafka-custom-serializer-example</artifactId> <version>0.0.1-SNAPSHOT</version> <name>Kafka-Custom-Serializer-Example</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.7.4</version> <scope>provided</scope> </dependency> </dependencies> </project> |
2.4. Classes Descriptions
The source code includes 6 classes:
User. java. Encapsulates user information. Producer will create and send users to broker while consumer will receive the objects from the broker.
UserSerializer.java. Convert the User object to byte array
UserDeserializer.java. Convert the byte array to User object
UserProducerThread.java. The producer class
UserConsumerThread.java. The consumer class
KakfaCustomSerializerMain.java. The entry point, used for testing purpose
2.4.1.User. java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
package com.howtoprogram.kafka.customserializer; public class User { private Long id; private String userName; private String firstName; private String lastName; private int age; public User() { } public User(Long id, String userName, String firstName, String lastName, int age) { super(); this.id = id; this.userName = userName; this.firstName = firstName; this.lastName = lastName; this.age = age; } /* * (non-Javadoc) * * @see java.lang.Object#toString() */ @Override public String toString() { return "User [id=" + id + ", userName=" + userName + ", firstName=" + firstName + ", lastName=" + lastName + ", age=" + age + "]"; } //getters and setters } |
2.4.2. UserSerializer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
package com.howtoprogram.kafka.customserializer; import java.util.Map; import org.apache.kafka.common.serialization.Serializer; import com.fasterxml.jackson.databind.ObjectMapper; public class UserSerializer implements Serializer<User> { @Override public void close() { } @Override public void configure(Map<String, ?> arg0, boolean arg1) { } @Override public byte[] serialize(String arg0, User arg1) { byte[] retVal = null; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(arg1).getBytes(); } catch (Exception e) { e.printStackTrace(); } return retVal; } } |
2.4.3. UserDeserializer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
package com.howtoprogram.kafka.customserializer; import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; import com.fasterxml.jackson.databind.ObjectMapper; public class UserDeserializer implements Deserializer<User> { @Override public void close() { } @Override public void configure(Map<String, ?> arg0, boolean arg1) { } @Override public User deserialize(String arg0, byte[] arg1) { ObjectMapper mapper = new ObjectMapper(); User user = null; try { user = mapper.readValue(arg1, User.class); } catch (Exception e) { e.printStackTrace(); } return user; } } |
2.4.3. UserProducerThread.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
package com.howtoprogram.kafka.customserializer; import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class UserProducerThread implements Runnable { private final KafkaProducer<String, User> producer; private final String topic; public UserProducerThread(String brokers, String topic) { Properties prop = createProducerConfig(brokers); this.producer = new KafkaProducer<String, User>(prop); this.topic = topic; } private static Properties createProducerConfig(String brokers) { Properties props = new Properties(); props.put("bootstrap.servers", brokers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "com.howtoprogram.kafka.customserializer.UserSerializer"); return props; } @Override public void run() { List<User> users = new ArrayList<>(); users.add(new User(1l, "tom", "Tom", "Riddle", 40)); users.add(new User(2l, "harry", "Harry", "Potter", 10)); for (User user : users) { producer.send(new ProducerRecord<String, User>(topic, user.getUserName(), user), new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { e.printStackTrace(); } System.out.println("Sent:" + user.toString()); } }); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } // closes producer producer.close(); } } |
Important note here is we registered the value.serializer property of the producer with our custom serializer.
1 |
props.put("value.serializer", "com.howtoprogram.kafka.customserializer.UserSerializer"); |
2.4.4. UserDeserializer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
package com.howtoprogram.kafka.customserializer; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class UserConsumerThread implements Runnable { private final KafkaConsumer<String, User> consumer; private final String topic; public UserConsumerThread(String brokers, String groupId, String topic) { Properties prop = createConsumerConfig(brokers, groupId); this.consumer = new KafkaConsumer<>(prop); this.topic = topic; this.consumer.subscribe(Arrays.asList(this.topic)); } private static Properties createConsumerConfig(String brokers, String groupId) { Properties props = new Properties(); props.put("bootstrap.servers", brokers); props.put("group.id", groupId); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "com.howtoprogram.kafka.customserializer.UserDeserializer"); return props; } @Override public void run() { while (true) { ConsumerRecords<String, User> records = consumer.poll(100); for (final ConsumerRecord<String, User> record : records) { System.out.println("Receive: " + record.value().toString()); } } } } |
Consumer thread. It will poll the broker for the messages and print out to the console.
Note that we already registered the value.deserializer property of the consumer with our custom deserializer.
1 |
props.put("value.deserializer", "com.howtoprogram.kafka.customserializer.UserDeserializer"); |
2.4.5. KakfaCustomSerializerMain.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
package com.howtoprogram.kafka.customserializer; public final class KakfaCustomSerializerMain { public static void main(String[] args) { String brokers = "localhost:9092"; String groupId = "group01"; String topic = "UserMessageTopic10"; if (args != null && args.length == 3) { brokers = args[0]; groupId = args[1]; topic = args[2]; } // Start User Producer Thread UserProducerThread producerThread = new UserProducerThread(brokers, topic); Thread t1 = new Thread(producerThread); t1.start(); // Start group of User Consumer Thread UserConsumerThread consumerThread = new UserConsumerThread(brokers, groupId, topic); Thread t2 = new Thread(consumerThread); t2.start(); try { Thread.sleep(100000); } catch (InterruptedException ie) { } } } |
The entry point class which initialize the producer, consumer threads for our testing purpose.
2.4.6 Run the application.
Step 1. Execute the script: create-topics.sh or create-topics.bat (on Windows) to create the UserMessageTopictopic on the Kafka broker.
Step 2. Open the KakfaCustomSerializerMain.java on the eclipse. Right click –> Run As –> Java Application or use the shortcut: Alt+Shift+x, j to start the main method.
The output on my eclipse is as below:
Sent:User [id=1, userName=tom, firstName=Tom, lastName=Riddle, age=40]
Sent:User [id=2, userName=harry, firstName=Harry, lastName=Potter, age=10]
Receive: User [id=1, userName=tom, firstName=Tom, lastName=Riddle, age=40]
Receive: User [id=2, userName=harry, firstName=Harry, lastName=Potter, age=10]
We can see that the users sent by producer were received by the consumer
2.5 Conclusion.
Above is an basic example how to write a custom serializer in Apache Kafka. The example just implemented the custom serializer/deserializer for the value. You can base on example to implement the custom serializer for the key. Currently, Apache Kafka supports some kinds of serializer/deserializer as below:
- ByteArray
- Integer
- Long
- String
If there is not enough for your purpose. You can try to implement your own ones. However, before doing that, let’s have a look at some serialization libraries such as: Avro, Apache Thrift or Protocol Buffer…
Below are the articles related to Apache Kafka. If you’re interested in them, you can refer to the following links:
Getting started with Apache Kafka 0.9
Apache Kafka 0.9 Java Client API Example
Create Multi-threaded Apache Kafka Consumer
Apache Kafka Command Line Interface
Write An Apache Kafka Custom Partitioner
Apache Kafka Connect MQTT Source Tutorial
Apache Flume Kafka Source And HDFS Sink Tutorial
Hi,
I have just downloaded the zip file, import into my eclipse and run the program again. It’s still working.
See your logs, it seems you did some changes like the logging library (log4j). And you also changed the jackson-databind version.
You should download the zip file, import, and run the example before you making change. You can also send me your source code, I will take a look at it.
Thanks,