To continue the series about Apache Flume tutorials, I’d like to share an example about Apache Flume Kafka Source and HDFS Sink. One of popular use case today is to collect the data from various sources, send them to Apache Kafka which will make them be ready for real-time processing and analysis with other frameworks like Apache Storm, Apache Spark. However, probably that is not enough, there are some tasks require batch processing rather than real-time processing, or we may simply want to store data in a central place as a backup for future use cases, knowledge that currently we don’t have demand. In such cases, we can see that the central place for storing data can be HDFS, relational databases, or NoSQLs.
The use case is generally described as following image:
We can have multiple data sources that produce the data to Apache Kafka. We can implement them easily by using Apache Kafka Connect, tools like Apache Flume with appropriate Flume Sources and Flume Kafka Sink, or simply write some custom Apache Kafka consumers and producers.
When data in Apache Kafka, it is easy for real-time processing frameworks like Apache Spark or Apache Storm to consume and process in real-time. The result can be store back to HDFS or any appropriate databases.
As mentioned above, beside feeding the data for real-time frameworks, we will need to store the data into a single central place for further processing or batch processing. In this post, we will try to save the data from Apache Kafka to HDFS. Today, we can achieve that purpose by using different tools. We can leverage Apache Kafka Connect with HDFS Connector, Apache Flume or simply write our custom Kafka HDFS consumer. In this post, we will use the 2nd approach which is Apache Flume Kafka Source and HDFS Sink. Note that Apache Kafka Source and HDFS Sink are built-in Source and Sink of Apache Flume. We don’t need any further customization to achieve our purpose.
There is a recent tutorial of mine on using Apache Flume HDFS Sink, you may want to refer to it.
1. Preparation
1.1. More about example.
We will have an Apache Kafka topic to which we can feed the data. We also set up an Flume agent with Apache Kafka Source and HDFS Sink to write the data to HDFS.
1.2. Environment
- Apache Flume 1.6.0
- Apache Kafka 0.10.0.0
- Hadoop 2.6.0
You can use Cloudera Quickstart VMs. The current version is CDH 5.7. It already has Apache Hadoop ecosystems including Apache Flume installed.
You will need Apache Kafka 0.10.0.0 installed on your machine. You can refer to my recent post to get it installed. Getting started with Apache Kafka
1.3. Create a configuration for Flume Agent
Create a file flume-kafka-source-hdfs-sink.conf in the folder: /usr/lib/flume-ng/conf with following content:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource agent1.sources.kafka-source.zookeeperConnect = localhost:2191 agent1.sources.kafka-source.topic = hello-kafka-topic agent1.sources.kafka-source.groupId = flume agent1.sources.kafka-source.channels = memory-channel agent1.sources.kafka-source.interceptors = i1 agent1.sources.kafka-source.interceptors.i1.type = timestamp agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100 agent1.channels.memory-channel.type = memory agent1.channels.memory-channel.capacity = 10000 agent1.channels.memory-channel.transactionCapacity = 1000 agent1.sinks.hdfs-sink.type = hdfs agent1.sinks.hdfs-sink.hdfs.path = hdfs://quickstart.cloudera:8020/tmp/kafka/%{topic}/%y-%m-%d agent1.sinks.hdfs-sink.hdfs.rollInterval = 5 agent1.sinks.hdfs-sink.hdfs.rollSize = 0 agent1.sinks.hdfs-sink.hdfs.rollCount = 0 agent1.sinks.hdfs-sink.hdfs.fileType = DataStream agent1.sinks.hdfs-sink.channel = memory-channel agent1.sources = kafka-source agent1.channels = memory-channel agent1.sinks = hdfs-sink |
Let’s take a look at the file.
As for the Apache Flume Kafka Source, we have defined:
- The type of the source: org.apache.flume.source.kafka.KafkaSource.
- The Zookeeper where Apache Kafka brokers are used: agent1.sources.kafka-source.zookeeperConnect = localhost:2191.
- The topic which messages will be consumed by the agent: agent1.sources.kafka-source.topic = hello-kafka-topic.
- The group of agent which takes the Apache Kafka consumer role: agent1.sources.kafka-source.groupId = flume
- The channel into which the message will be written to: agent1.sources.kafka-source.channels = memory-channel
As for the Apache Flume HDFS Sink, we have defined:
- The type of the sink, which is a HDFS Sink: agent1.sinks.hdfs-sink.type = hdfs
- The HDFS path where the message will be stored: agent1.sinks.hdfs-sink.hdfs.path = hdfs://quickstart.cloudera:8020/tmp/kafka/%{topic}/%y-%m-%d
we save each topic in each directory, separated by date.
- The HDFS file type: agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
- And the channel from which messages can be read: agent1.sinks.hdfs-sink.channel = memory-channel
And the channel, we use a memory channel between the above Flume Kafka Source and Flume HDFS Sink: agent1.channels.memory-channel.type = memory
2. Run the Apache Flume Kafka Source and HDFS Sink agent.
2.1. Start the Agent
Go to the folder where Apache Flume is installed. In my Cloudera, it is: /usr/lib/flume-ng
1 |
cd /usr/lib/flume-ng |
Start the agent by issuing following command:
1 |
./bin/flume-ng agent --conf conf -conf-file conf/flume-kafka-source-hdfs-sink.conf --name agent1 |
2.2. Produce some messages to the hello-kafka-topic
We can use Apache Kafka command line tool to produce some message to the topic. For example:
1 |
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello-kafka-topic |
Enter some messages like below, line by line:
1 2 3 |
Hello Halo Salut |
2.2. Verify the result.
We will verify the result by checking the directory: hdfs://quickstart.cloudera:8020/tmp/kafka/hello-kafka-topic/ on the hdfs. We can do that by many ways like use Hue to browse the directory. In this example, we will try to use hdfs command line.
Firstly, we list all the file in the directory: hello-kafka-topic:
1 |
hadoop fs -ls hdfs://quickstart.cloudera:8020/tmp/kafka/hello-kafka-topic/ |
The output on my console:
1 2 |
Found 1 items drwxrwxrwt - cloudera supergroup 0 2016-08-06 00:29 hdfs://quickstart.cloudera:8020/tmp/kafka/hello-kafka-topic/16-08-06 |
There is one subfolder: 16-08-06.
We will try to see the content of this directory by issuing below command:
1 |
hadoop fs -cat hdfs://quickstart.cloudera:8020/tmp/kafka/hello-kafka-topic/16-08-06/* |
The output on my console:
1 2 3 |
hello halo salut |
3. Conclusion.
We have tried to use Apache Flume Kafka Source and HDFS Sink to move the data from Apache Kafka to HDFS. This is very basic tutorial. You can improve it depend on your demands. Note that there are other approaches to do that such as: using Apache Kafka Connect with HDFS Connector or simply write some custom Apache Kafka consumer and producer to do that. However, the Kafka Source and HDFS Sink are built-in supported by Apache Flume. Therefore, they will help us in reducing our effort. In next post, I’d like to share how to use other Apache Flume Sources and Sinks.
Below are the articles related to Apache Kafka topic. If you’re interested in them, you can refer to the following links:
Getting started with Apache Kafka 0.9
Apache Kafka 0.9 Java Client API Example
How To Write A Custom Serializer in Apache Kafka
Write An Apache Kafka Custom Partitioner
Apache Kafka Command Line Interface
Apache Kafka Connect MQTT Source Tutorial
Nice tutorial. Can you tell me incase i have 12 topics in kafka from where i have to move the data to flume how do i customize the flume configuration to make sure that each topic data will go to separate directory in HDFS. Do we have to use 12 different soucres in flume.conf.
Unfortunately, at the time, each Kafka Source can read from 1 topic only. So, we have to use 12 different sources in the flume.conf as you mentioned. See the document from Flume website:
https://flume.apache.org/FlumeUserGuide.html#kafka-source
Hi, the above content is really very helpful.
After following all the steps, I was am not able to see the data/file getting created in HDFS
Could you please help me on the same.
Thanks in advance.
Varun