dhalfageme
dhalfageme

Reputation: 1545

Flume + Kafka + HDFS: Split messages

I have the following flume agent configuration to read messages from a kafka source and write them back to a HDFS sink

tier1.sources  = source1
tier 1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 192.168.0.100:2181
tier1.sources.source1.topic = test
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100

tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.channel1.brokerList = 192.168.0.100:9092

tier1.channels.channel1.topic = test
tier1.channels.channel1.zookeeperConnect = 192.168.0.100:2181/kafka
tier1.channels.channel1.parseAsFlumeEvent = false

tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.writeFormat = Text
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.filePrefix = test-kafka
tier1.sinks.sink1.hdfs.fileSufix = .avro
tier1.sinks.sink1.hdfs.useLocalTimeStamp = true
tier1.sinks.sink1.hdfs.path = /tmp/kafka/%y-%m-%d
tier1.sinks.sink1.hdfs.rollCount=0
tier1.sinks.sink1.hdfs.rollSize=0

The kafka messages content is avro data which is properly serialized into a file if only one kafka messages arrives every polling period.

When two kafka messages arrive on the same batch, they are grouped on the same HDFS file, since the avro messages contains both schema + data, the result file contains schema + data + schema + data, causing it to be a invalid .avro file.

How can I split the avro event to get the different kafka messages split to be written each one of them on a different file

Thank you

Upvotes: 1

Views: 430

Answers (1)

voldy
voldy

Reputation: 387

One approach: Lets say you call your source kafka incoming data 'SourceTopic'. You can register a custom sink to this 'SourceTopic'.

<FlumeNodeRole>.sinks.<your-sink>.type =net.my.package.CustomSink

In your CustomSink, you can write a method to differentiate incoming message, split it, and resend to a different 'DestinationTopic'. This 'DestinationTopic' can now act as a new flume source for your file serialization.

Refer below link for pipe-lining flume: https://flume.apache.org/FlumeUserGuide.html

Upvotes: 0

Related Questions