To continue the topic about Apache Kafka Connect, I’d like to share how to use Apache Kafka connect MQTT Source to move data from MQTT broker into Apache Kafka. Note that from the version 0.9, Apache Kafka introduce a new feature called Kafka Connector which allow users easily to integrate Kafka with other data sources. You can refer to my previous post about Apache Kafka Connect for more detail.
1. Prerequisite
- Apache Kafka 0.10.x version installed. If you don’t have, you can refer to my previous post about getting started with Apache Kafka
- You should have MQTT broker be ready. I am using the Mosquito MQTT broker.
1. Preparation
1.1. Basic about Apache Kafka Connect MQTT Source.
This is not an official connector from Apache Kafka, instead it comes from the community. You can see more information and the source code on the Github. It allows us to move data from the MQTT broker into Apache Kafka.It’s still in the development stage. We will try to pull the source code from this repository, build and use with Apache Kafka.
1.2. Download the Apache Kafka Connect MQTT binary and dependency
If you want to build the source code by yourselves, you can go to step 1.3. If you simply need the binary files, you can download kafka-mqtt-bin.zip and skip the step 1.3
1.3. Get and build the source code.
If you are familiar with Git, you can pull the source code from Github, or you can download it directly here
You will need to use Gradle to build this source code into jar file. If you have Gradle installed, simply extract the zip file, navigate to the extracted directory and issue below command to build the source code.
On Windows:
1 |
.\gradlew.bat clean jar |
On Linux:
1 |
./gradlew clean jar |
The Kakfa Connect MQTT also needs a Eclipse Paho client library to “talk” with the MQTT broker. You can get it in this package: kafka-mqtt-bin
2. Using Apache Kafka Connect MQTT
Assume that the Apache Kafka is installed at: /opt/kafka_2.11-0.10.0.0
Step 1. Copy Kafka Connect MQTT library and dependency to the Kafka distribution
Copy to /opt/kafka_2.11-0.10.0.0/libs below files:
- kafka-connect-mqtt-1.0-SNAPSHOT.jar
- org.eclipse.paho.client.mqttv3-1.0.2.jar
The connector needs the Paho client to communicate with the MQTT broker.
Step 2. Create configuration file for Kafka MQTT connector
In the /opt/kafka_2.11-0.10.0.0/libs, create an mqtt.properties with following content:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
name=mqtt connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector tasks.max=1 kafka.topic=hello-mqtt-kafka mqtt.client_id=mqtt-kafka-123456789 mqtt.clean_session=true mqtt.connection_timeout=30 mqtt.keep_alive_interval=60 mqtt.server_uris=tcp://localhost:1883 mqtt.topic=hello-mqtt |
We have named the connector: mqtt, set the connector class to the MqttSourceConnector, set the maximum number of task is 1. We specify the Kafka topic, to which the data from the MQTT broker will be published, is hello-mqtt-kafka. We also specify the MQTT server, which is deployed on the localhost, the mqtt topic which data will be consumed and moved to Kafka.
In short, we will move message from topic “hello-mqtt” in the MQTT broker to the Apache Kafka topic: hello-mqtt-kafka.
Step 3. Make sure your MQTT broker be ready
Step 4. Start Apache Kafka server
We first change directory to the Apache Kafka installation folder.
1 |
cd /opt/kafka_2.11-0.10.0.0 |
Start ZooKeeper
1 |
./bin/zookeeper-server-start.sh config/zookeeper.properties & |
Start Kafka Server
1 |
./bin/kafka-server-start.sh config/server.properties & |
Step 5. Start the Apache Kafka Connect MQTT source.
Issue the following command and wait
1 |
./bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties |
Here is some output on my console:
1 2 3 4 5 6 7 |
(com.evokly.kafka.connect.mqtt.MqttSourceConnectorConfig:178) [2016-07-30 03:39:25,922] INFO Initialize transform process properties (com.evokly.kafka.connect.mqtt.MqttSourceConnector:73) [2016-07-30 03:39:25,932] INFO Created connector mqtt (org.apache.kafka.connect.cli.ConnectStandalone:91) [2016-07-30 03:39:26,161] INFO [mqtt-kafka-123456789] Connected to Broker (com.evokly.kafka.connect.mqtt.MqttSourceConnector:117) [2016-07-30 03:39:26,165] INFO [mqtt-kafka-123456789] Subscribe to 'hello-mqtt' with QoS '1' (com.evokly.kafka.connect.mqtt.MqttSourceConnector:129) [2016-07-30 03:39:26,165] INFO Source task WorkerSourceTask{id=mqtt-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:138) [2016-07-30 03:39:26,645] INFO Reflections took 2112 ms to scan 63 urls, producing 2899 keys and 21708 values (org.reflections.Reflections:229) |
Step 6. Publish some messages to the topic: hello-mqtt
We will publish some messages to the hello-mqtt topic on the MQTT broker. You can use command line or whatever. I’m currently using MQTTlens, an extension of Google Chrome, to publish and subscribe messages from MQTT broker.
1 2 3 |
Hello Halo Salut |
Step 7. Verify messages on the Apache Kafka topic: hello-mqtt-kafka
Start a consumer and subscribe for the topic: hello-mqtt-kafka
1 |
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic hello-mqtt-kafka |
We can see output on the console as following:
1 2 3 |
{"schema":{"type":"bytes","optional":false},"payload":"QWxvaGE="} {"schema":{"type":"bytes","optional":false},"payload":"WGluIENoYW8K"} {"schema":{"type":"bytes","optional":false},"payload":"MDEwMQo="} |
Note that our current Kafka Connect MQTT is publishing the messages to Kafka topic in byte array. We will try to improve this in another tutorial.
3. Conclusion
Moving data from MQTT broker to Apache Kafka for further processing is very high demand today. We can imagine a scenario where all the sensors will send the logs to the MQTT broker and then the broker move the data to Apache Kafka for streaming, real-time processing. And in this post, we have tried to use Apache Kafka Connect MQTT, a community Kafka connector to move data from the MQTT broker to Apache Kafka. This is very basic tutorial and need to be improved in some points such as: data format, number of processing tasks, etc. I will try to cover them in other topics.
Below are the articles related to Apache Kafka topic. If you’re interested in them, you can refer to the following links:
Getting started with Apache Kafka 0.9
Apache Kafka 0.9 Java Client API Example
Create Multi-threaded Apache Kafka Consumer
How To Write A Custom Serializer in Apache Kafka
Write An Apache Kafka Custom Partitioner
Apache Kafka Command Line Interface
Apache Flume Kafka Source And HDFS Sink Tutorial
Great tutorial.
How would you pass messages as strings?
Thanks
Would love to see a follow up post discussing advanced topics such as the ones mentioned in this conclusion.