To continue the series about Apache Flume tutorials, I’d like to share an example about Apache Flume Kafka Source and HDFS Sink. One of popular use case today is to collect the data from various sources, send them to Apache Kafka which will make them be ready for real-time processing and analysis with other frameworks like Apache Storm, Apache Spark. However, probably that is not enough, there are some tasks require batch processing rather than real-time processing, or we may simply want to store data in a central place as a backup for future use cases, knowledge that currently we don’t have demand. In such cases, we can see that the central place for storing data can be HDFS, relational databases, or NoSQLs.

The use case is generally described as following image:

Apache Flume Kafka Source And HDFS Sink - Popular data pipleine use case

We can have multiple data sources that produce the data to Apache Kafka. We can implement them easily by using Apache Kafka Connect, tools like Apache Flume with appropriate Flume Sources and Flume Kafka Sink, or simply write some custom Apache Kafka consumers and producers.

When data in Apache Kafka, it is easy for real-time processing frameworks like Apache Spark or Apache Storm to consume and process in real-time. The result can be store back to HDFS or any appropriate databases.

As mentioned above, beside feeding the data for real-time frameworks, we will need to store the data into a single central place for further processing or batch processing. In this post, we will try to save the data from Apache Kafka to HDFS. Today, we can achieve that purpose by using different tools. We can leverage Apache Kafka Connect with HDFS Connector, Apache Flume or simply write our custom Kafka HDFS consumer. In this post, we will use the 2nd approach which is Apache Flume Kafka Source and HDFS Sink. Note that Apache Kafka Source and HDFS Sink are built-in Source and Sink of Apache Flume. We don’t need any further customization to achieve our purpose.

There is a recent tutorial of mine on using Apache Flume HDFS Sink, you may want to refer to it.

1. Preparation

1.1. More about example.

We will have an Apache Kafka topic to which we can feed the data. We also set up an Flume agent with Apache Kafka Source and HDFS Sink to write the data to HDFS.

1.2. Environment

  • Apache Flume 1.6.0
  • Apache Kafka 0.10.0.0
  • Hadoop 2.6.0

You can use Cloudera Quickstart VMs. The current version is CDH 5.7. It already has Apache Hadoop ecosystems including Apache Flume installed.

You will need Apache Kafka 0.10.0.0 installed on your machine. You can refer to my recent post to get it installed. Getting started with Apache Kafka

1.3. Create a configuration for Flume Agent

Create a file flume-kafka-source-hdfs-sink.conf in the folder: /usr/lib/flume-ng/conf with following content:

Let’s take a look at the file.

As for the Apache Flume Kafka Source, we have defined:

  • The type of the source: org.apache.flume.source.kafka.KafkaSource.
  • The Zookeeper where Apache Kafka brokers are used:  agent1.sources.kafka-source.zookeeperConnect = localhost:2191.
  • The topic which messages will be consumed by the agent:  agent1.sources.kafka-source.topic = hello-kafka-topic.
  • The group of agent which takes the Apache Kafka consumer role:  agent1.sources.kafka-source.groupId = flume
  • The channel into which the message will be written to: agent1.sources.kafka-source.channels = memory-channel

As for the Apache Flume HDFS Sink, we have defined:

  • The type of the sink, which is a HDFS Sink: agent1.sinks.hdfs-sink.type = hdfs
  • The HDFS path where the message will be stored: agent1.sinks.hdfs-sink.hdfs.path = hdfs://quickstart.cloudera:8020/tmp/kafka/%{topic}/%y-%m-%d

we save each topic in each directory, separated by date.

  • The HDFS file type:  agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
  • And the channel from which messages can be read: agent1.sinks.hdfs-sink.channel = memory-channel

And the channel, we use a memory channel between the above Flume Kafka Source and Flume HDFS Sink: agent1.channels.memory-channel.type = memory

2. Run the Apache Flume Kafka Source and HDFS Sink agent.

2.1. Start the Agent

Go to the folder where Apache Flume is installed. In my Cloudera, it is: /usr/lib/flume-ng

Start the agent by issuing following command:

2.2. Produce some messages to the hello-kafka-topic

We can use Apache Kafka command line tool to produce some message to the topic. For example:

Enter some messages like below, line by line:

2.2. Verify the result.

We will verify the result by checking the directory: hdfs://quickstart.cloudera:8020/tmp/kafka/hello-kafka-topic/ on the hdfs. We can do that by many ways like use Hue to browse the directory. In this example, we will try to use hdfs command line.

Firstly, we list all the file in the directory: hello-kafka-topic:

The output on my console:

There is one subfolder: 16-08-06.

We will try to see the content of this directory by issuing below command:

The output on my console:

3. Conclusion.

We have tried to use Apache Flume Kafka Source and HDFS Sink to move the data from Apache Kafka to HDFS. This is very basic tutorial. You can improve it depend on your demands. Note that there are other approaches to do that such as: using Apache Kafka Connect with HDFS Connector or simply write some custom Apache Kafka consumer and producer to do that.  However, the Kafka Source and HDFS Sink are built-in supported by Apache Flume. Therefore, they will help us in reducing our effort. In next post, I’d like to share how to use other Apache Flume Sources and Sinks.

Below are the articles related to Apache Kafka topic. If you’re interested in them, you can refer to the following links:

Apache Kafka Tutorial

Getting started with Apache Kafka 0.9

Apache Kafka 0.9 Java Client API Example

How To Write A Custom Serializer in Apache Kafka

Write An Apache Kafka Custom Partitioner

Using Apache Kafka Docker

Apache Kafka Command Line Interface

Apache Kafka Connect Example

Apache Kafka Connect MQTT Source Tutorial

 

 

 

0 0 vote
Article Rating