Yassine Fadhlaoui
Yassine Fadhlaoui

Reputation: 344

How to write data in real time to HDFS using Flume?

I am using Flume to store sensor data in HDFS. Once the data is received through MQTT. The subscriber posts the data in JSON format to Flume HTTP listener. It is currently working fine, but the problem is that flume is not writing to HDFS file till I stop it (or the size of the file reachs 128MB). I am using Hive to apply a schema on read. Unfortunately, the resulting hive table contains only 1 entry. This is normal because Flume did not write new coming data to file (loaded by Hive).

Is there any manner to force Flume to write new coming data to HDFS in a near-real time way? So, I don't need to restart it or to use small files?

here is my flume configuration:

# Name the components on this agent
emsFlumeAgent.sources = http_emsFlumeAgent
emsFlumeAgent.sinks = hdfs_sink
emsFlumeAgent.channels = channel_hdfs

# Describe/configure the source
emsFlumeAgent.sources.http_emsFlumeAgent.type = http
emsFlumeAgent.sources.http_emsFlumeAgent.bind = localhost
emsFlumeAgent.sources.http_emsFlumeAgent.port = 41414

# Describe the sink
emsFlumeAgent.sinks.hdfs_sink.type = hdfs
emsFlumeAgent.sinks.hdfs_sink.hdfs.path = hdfs://localhost:9000/EMS/%{sensor}
emsFlumeAgent.sinks.hdfs_sink.hdfs.rollInterval = 0
emsFlumeAgent.sinks.hdfs_sink.hdfs.rollSize = 134217728
emsFlumeAgent.sinks.hdfs_sink.hdfs.rollCount=0

#emsFlumeAgent.sinks.hdfs_sink.hdfs.idleTimeout=20
# Use a channel which buffers events in memory
emsFlumeAgent.channels.channel_hdfs.type = memory
emsFlumeAgent.channels.channel_hdfs.capacity = 10000
emsFlumeAgent.channels.channel_hdfs.transactionCapacity = 100

# Bind the source and sinks to the channel
emsFlumeAgent.sources.http_emsFlumeAgent.channels = channel_hdfs 
emsFlumeAgent.sinks.hdfs_sink.channel = channel_hdfs

Upvotes: 1

Views: 683

Answers (1)

Lalit
Lalit

Reputation: 2014

I think the tricky bit here is that you would like to write data to HDFS in near real time but don't want small files either (for obvious reasons) and this could be a difficult thing to a achieve.

You'll need to find optimal balance between the following two parameters:

hdfs.rollSize (Default = 1024) - File size to trigger roll, in bytes (0: never roll based on file size)

and

hdfs.batchSize (Default = 100) - Number of events written to file before it is flushed to HDFS

If your data is not likely to reach 128 MB in the preferred time duration, then you may need to reduce the rollSize but only to an extent that you don't run into the small files problem.

Since, you have not set any batch size in your HDFS sink, you should see the results of HDFS flush after every 100 records but once the size of the flushed records jointly reaches 128 MB, the contents would be rolled up in a 128 MB file. Is this also not happening? Could you please confirm?

Hope this helps!

Upvotes: 0

Related Questions