Continue the series about Apache Kafka, in this post, I’d like to share some knowledge about Apache Kafka topic partition and how to write an Apache Kafka Custom Partitioner.
1. Basic about Apache Kafka Topic Partition.
There are many reasons why Apache Kafka is being adopted and used more widely today. Two of them which are related to the Topic Partition, I’d like to mention in this post are:
- Scalable
- High Performance
A Kafka topic likes other topics in other traditional messaging brokers. It’s simply a category, a feed name, a pipe to which messages can be published. There are a few points we should note that: by default, Apache Kafka stores the topic data on the disk directly rather than relational database. And only a consumer of a consumer group can be consumed messages from a Topic. These seems to lead us to several thoughts as following:
- The topic data can not be scaled out of a single machine
- If the producer rate is higher than the consumer rate, then the consumer tends to fall further behind the producer day by day.
Apache Kafka solves this by introducing the concept of Topic Partition. A topic now can be divided into many partitions depended on our application business logic. Partitions data can be stored on different machines of the cluster. And each partition of the topic can be consumed by different consumers of a consumer group. This improves the ingestion rate the consumer group.
2. Write An Apache Kafka Custom Partitioner.
By default, Apache Kafka producer will distribute the messages to different partitions by round-robin fashion. We can change this by using our custom partitioner. In below example, assume that we’re implementing a basic notification application which allow users to subscribe to receive notifications from other users. We will create a Kafka topic with many partitions. Each partition will hold the messages/notifications of a user.
Note that the example implementation will not create multiple many producers, consumers as above image. We just use 1 producer and 1 consumer with custom partitioner.
2.1. Prerequisites
- Apache Kafka 0.9 broker installed on local machine or remote. You can refer to this link for setting up.
- JDK 7/8 installed on your development PC.
- Eclipse 4 (I am using Eclipse Mars 4.5)
- Maven 3
2.2. Source code structure
The example source code is added in the Github. You can use git to pull the repository to your PC or simple download the zip version of the example and extract to your PC.
After having the source code, you can import the source code into Eclipse and run the test.
To import:
- Menu File –> Import –> Maven –> Existing Maven Projects
- Browse to your source code location
- Click Finish button to finish the importing
Here is the project structure in my Eclipse:
2.3. Maven Pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.howtoprogram</groupId> <artifactId>kafka-multithreaded-java-example</artifactId> <version>0.0.1-SNAPSHOT</version> <name>Kafka-MultiThread-Java-Example</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> <scope>provided</scope> </dependency> </dependencies> </project> |
2.4. Classes Descriptions
The source code includes 6 classes:
IUserService.java, UserServiceImpl.java. These classes are used to handle on operations related to users. Currently they provide 2 methods: findUserId and findAllUsers on the list of 5 users: Tom, Mary, Alice, Daisy, Helen.
UserProducerThread.java. This is a simple producer, produces 5 messages to the UserMessageTopic topic.
UserConsumerThread.java. This is a simple consumer. It subscribe for the UserMessageTopic topic and print out the messages which contain information about partition, to console.
KafkaUserCustomPartitioner.java. This is our custom partitioner which will divide the messages based on user id.
KafkaCustomPartitionerMain.java. The entry point class. Initialize producer, consumer for our testing purpose.
2.4.1. IUserService.java, UserServiceImpl.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
package com.howtoprogram.kafka.custompartitioner; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class UserServiceImpl implements IUserService { // Pairs of username and id private Map<String, Integer> usersMap; public UserServiceImpl() { usersMap = new HashMap<>(); usersMap.put("Tom", 1); usersMap.put("Mary", 2); usersMap.put("Alice", 3); usersMap.put("Daisy", 4); usersMap.put("Helen", 5); } @Override public Integer findUserId(String userName) { return usersMap.get(userName); } @Override public List<String> findAllUsers() { return new ArrayList<>(usersMap.keySet()); } } |
2.4.2. UserProducerThread.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
package com.howtoprogram.kafka.custompartitioner; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class UserProducerThread implements Runnable { private final KafkaProducer<String, String> producer; private final String topic; private IUserService userService; public UserProducerThread(String brokers, String topic) { Properties prop = createProducerConfig(brokers); this.producer = new KafkaProducer<String, String>(prop); this.topic = topic; userService = new UserServiceImpl(); } private static Properties createProducerConfig(String brokers) { Properties props = new Properties(); props.put("bootstrap.servers", brokers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("partitioner.class", "com.howtoprogram.kafka.custompartitioner.KafkaUserCustomPatitioner"); return props; } @Override public void run() { System.out.println("Produces 5 messages"); List<String> users = userService.findAllUsers(); for (String user : users) { String msg = "Hello " + user; producer.send(new ProducerRecord<String, String>(topic, user, msg), new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { e.printStackTrace(); } System.out .println("Sent:" + msg + ", User: " + user + ", Partition: " + metadata.partition()); } }); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } // closes producer producer.close(); } } |
1 2 |
props.put("partitioner.class", "com.howtoprogram.kafka.custompartitioner.KafkaUserCustomPatitioner"); |
2.4.3. KafkaUserCustomPartitioner.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
package com.howtoprogram.kafka.custompartitioner; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; public class KafkaUserCustomPatitioner implements Partitioner { private IUserService userService; public KafkaUserCustomPatitioner() { userService = new UserServiceImpl(); } @Override public void configure(Map<String, ?> configs) { } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { int partition = 0; String userName = (String) key; // Find the id of current user based on the username Integer userId = userService.findUserId(userName); // If the userId not found, default partition is 0 if (userId != null) { partition = userId; } return partition; } @Override public void close() { } } |
2.4.4. UserConsumerThread.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
package com.howtoprogram.kafka.custompartitioner; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class UserConsumerThread implements Runnable { private final KafkaConsumer<String, String> consumer; private final String topic; public UserConsumerThread(String brokers, String groupId, String topic) { Properties prop = createConsumerConfig(brokers, groupId); this.consumer = new KafkaConsumer<>(prop); this.topic = topic; this.consumer.subscribe(Arrays.asList(this.topic)); } private static Properties createConsumerConfig(String brokers, String groupId) { Properties props = new Properties(); props.put("bootstrap.servers", brokers); props.put("group.id", groupId); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } @Override public void run() { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (final ConsumerRecord record : records) { System.out.println("Receive message: " + record.value() + ", Partition: " + record.partition() + ", Offset: " + record.offset()); } } } } |
2.4.5. KafkaCustomPartitionerMain.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
package com.howtoprogram.kafka.custompartitioner; public final class KakfaCustomPartitionerMain { public static void main(String[] args) { String brokers = "localhost:9092"; String groupId = "group01"; String topic = "UserMessageTopic"; if (args != null && args.length == 3) { brokers = args[0]; groupId = args[1]; topic = args[2]; } // Start User Producer Thread UserProducerThread producerThread = new UserProducerThread(brokers, topic); Thread t1 = new Thread(producerThread); t1.start(); // Start group of User Consumer Thread UserConsumerThread consumerThread = new UserConsumerThread(brokers, groupId, topic); Thread t2 = new Thread(consumerThread); t2.start(); try { Thread.sleep(100000); } catch (InterruptedException ie) { } } } |
2.4.6 Run the application.
Step 1. Execute the script: create-topics.sh or create-topics.bat (on Windows) to create the UserMessageTopic topic on the Kafka broker.
Step 2. Open the KafkCustomPartitionerMain.java on the eclipse. Right click –> Run As –> Java Application or use the shortcut: Alt+Shift+x, j to start the main method.
The output on my eclipse is as below:
Produces 5 messages
Sent:Hello Tom, User: Tom, Partition: 1
Sent:Hello Alice, User: Alice, Partition: 3
Sent:Hello Daisy, User: Daisy, Partition: 4
Receive message: Hello Tom, Partition: 1, Offset: 0
Receive message: Hello Alice, Partition: 3, Offset: 0
Receive message: Hello Daisy, Partition: 4, Offset: 0
Sent:Hello Helen, User: Helen, Partition: 5
Receive message: Hello Helen, Partition: 5, Offset: 0
Sent:Hello Mary, User: Mary, Partition: 2
Receive message: Hello Mary, Partition: 2, Offset: 0
We can see that messages were delivered and received as our expectation.
Tom: partition #1.
Alice: partition #3.
Daisy: partition #4.
Helen: partition #5.
Mary: partition #2.
3. Summary
Topic Partition is the key unit of parallelism in Apache Kafka. We can use partition to support us in scaling out not only storage but also operations. In this example, we have tried to write An Apache Kafka Custom Partitioner which heps distribute the user messages to correct partitions of the Topic. This is just an very simple example for reference. Hopefully can help you guys quickly reuse to your own purpose.
Below are the articles related to Apache Kafka topic. If you’re interested in them, you can refer to the following links:
Getting started with Apache Kafka 0.9
Apache Kafka 0.9 Java Client API Example
Create Multi-threaded Apache Kafka Consumer
Apache Kafka Command Line Interface
Apache Kafka Connect MQTT Source Tutorial
Apache Flume Kafka Source And HDFS Sink Tutorial