Reputation: 1545
I have the following flume agent configuration to read messages from a kafka source and write them back to a HDFS sink
tier1.sources = source1
tier 1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 192.168.0.100:2181
tier1.sources.source1.topic = test
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100
tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.channel1.brokerList = 192.168.0.100:9092
tier1.channels.channel1.topic = test
tier1.channels.channel1.zookeeperConnect = 192.168.0.100:2181/kafka
tier1.channels.channel1.parseAsFlumeEvent = false
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.writeFormat = Text
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.filePrefix = test-kafka
tier1.sinks.sink1.hdfs.fileSufix = .avro
tier1.sinks.sink1.hdfs.useLocalTimeStamp = true
tier1.sinks.sink1.hdfs.path = /tmp/kafka/%y-%m-%d
tier1.sinks.sink1.hdfs.rollCount=0
tier1.sinks.sink1.hdfs.rollSize=0
The kafka messages content is avro data which is properly serialized into a file if only one kafka messages arrives every polling period.
When two kafka messages arrive on the same batch, they are grouped on the same HDFS file, since the avro messages contains both schema + data, the result file contains schema + data + schema + data, causing it to be a invalid .avro file.
How can I split the avro event to get the different kafka messages split to be written each one of them on a different file
Thank you
Upvotes: 1
Views: 430
Reputation: 387
One approach: Lets say you call your source kafka incoming data 'SourceTopic'. You can register a custom sink to this 'SourceTopic'.
<FlumeNodeRole>.sinks.<your-sink>.type =net.my.package.CustomSink
In your CustomSink, you can write a method to differentiate incoming message, split it, and resend to a different 'DestinationTopic'. This 'DestinationTopic' can now act as a new flume source for your file serialization.
Refer below link for pipe-lining flume: https://flume.apache.org/FlumeUserGuide.html
Upvotes: 0