Apache Kafka Connect MQTT Source Tutorial

To continue the topic about Apache Kafka Connect, I’d like to share how to use Apache Kafka connect MQTT Source to move data from MQTT broker into Apache Kafka. Note that from the version 0.9, Apache Kafka introduce a new feature called Kafka Connector which allow users easily to integrate Kafka with other data sources. You can refer to my previous post about Apache Kafka Connect for more detail.

Apache Kafka Connect MQTT Source

Apache Kafka Connect MQTT Source

1. Prerequisite

  • Apache Kafka 0.10.x version installed. If you don’t have, you can refer to my previous post about getting started with Apache Kafka
  • You should have MQTT broker be ready. I am using the Mosquito MQTT broker.

1. Preparation

1.1. Basic about Apache Kafka Connect MQTT Source.

This is not an official connector from Apache Kafka, instead it comes from the community. You can see more information and the source code on the Github. It allows us to move data from the MQTT broker into Apache Kafka.It’s still in the development stage. We will try to pull the source code from this repository, build and use with Apache Kafka.

1.2. Download the  Apache Kafka Connect MQTT binary and dependency

If you want to build the source code by yourselves, you can go to step 1.3. If you simply need the binary files, you can download kafka-mqtt-bin.zip and skip the step 1.3

1.3. Get and build the source code.

If you are familiar with Git, you can pull the source code from Github, or you can download it directly here

You will need to use Gradle to build this source code into jar file. If you have Gradle installed, simply extract the zip file, navigate to the extracted directory and issue below command to build the source code.

On Windows:

On Linux:

The Kakfa Connect MQTT also needs a Eclipse Paho client library to “talk” with the MQTT broker. You can get it in this package: kafka-mqtt-bin

2. Using Apache Kafka Connect MQTT

Assume that the Apache Kafka is installed at: /opt/kafka_2.11-

Step 1. Copy Kafka Connect MQTT library and dependency to the Kafka distribution

Copy to /opt/kafka_2.11- below files:

  • kafka-connect-mqtt-1.0-SNAPSHOT.jar
  • org.eclipse.paho.client.mqttv3-1.0.2.jar

The connector needs the Paho client to communicate with the MQTT broker.

Step 2. Create configuration file for Kafka MQTT connector

In the /opt/kafka_2.11-, create an mqtt.properties with following content:

We have named the connector: mqtt, set the connector class to the MqttSourceConnector, set the maximum number of task is 1. We specify the Kafka topic, to which the data from the MQTT broker will be published, is hello-mqtt-kafka. We also specify the MQTT server, which is deployed on the localhost, the mqtt topic which data will be consumed and moved to Kafka.

In short, we will move message from topic “hello-mqtt” in the MQTT broker to the Apache Kafka topic: hello-mqtt-kafka.

Step 3. Make sure your MQTT broker be ready

Step 4. Start Apache Kafka server

We first change directory to the Apache Kafka installation folder.

Start ZooKeeper

Start Kafka Server

Step 5. Start the Apache Kafka Connect MQTT source.

Issue the following command and wait

Here is some output on my console:

Step 6. Publish some messages to the topic: hello-mqtt

We will publish some messages to the hello-mqtt topic on the MQTT broker. You can use command line or whatever. I’m currently using MQTTlens, an extension of Google Chrome, to publish and subscribe messages from MQTT broker.

Step 7. Verify messages on the Apache Kafka topic: hello-mqtt-kafka

Start a consumer and subscribe for the topic: hello-mqtt-kafka

We can see output on the console as following:

Note that our current Kafka Connect MQTT is publishing the messages to Kafka topic in byte array. We will try to improve this in another tutorial.

3. Conclusion

Moving data from MQTT broker to Apache Kafka for further processing is very high demand today. We can imagine a scenario where all the sensors will send the logs to the MQTT broker and then the broker move the data to Apache Kafka for streaming, real-time processing. And in this post, we have tried to use Apache Kafka Connect MQTT, a community Kafka connector to move data from the MQTT broker to Apache Kafka. This is very basic tutorial and need to be improved in some points such as: data format, number of processing tasks, etc. I will try to cover them in other topics.

Below are the articles related to Apache Kafka topic. If you’re interested in them, you can refer to the following links:

Apache Kafka Tutorial

Getting started with Apache Kafka 0.9

Apache Kafka 0.9 Java Client API Example

Create Multi-threaded Apache Kafka Consumer

How To Write A Custom Serializer in Apache Kafka

Write An Apache Kafka Custom Partitioner

Apache Kafka Connect Example

Apache Kafka Command Line Interface

Apache Flume Kafka Source And HDFS Sink Tutorial