Reputation: 92
Is it possible to set up a Flume Client-Collector-Structure using Avro Sink/Source in Cloudera-Quickstart-CDH-VM ? I know there's no practical use, however I wanted to understand how Flume works with Avro Files and how I can use them later with PIG etc..
It tried several configurations, however non of them worked. For me it seems that I need several agents, however there can only be one in the VM.
What I tried last:
agent.sources = reader avro-collection-source
agent.channels = memoryChannel memoryChannel2
agent.sinks = avro-forward-sink hdfs-sink
#Client
agent.sources.reader.type = exec
agent.sources.reader.command = tail -f /home/flume/avro/source.txt
agent.sources.reader.logStdErr = true
agent.sources.reader.restart = true
agent.sources.reader.channels = memoryChannel
agent.sinks.avro-forward-sink.type = avro
agent.sinks.avro-forward-sink.hostname = 127.0.0.1
agent.sinks.avro-forward-sink.port = 80
agent.sinks.avro-forward-sink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 100
# Collector
agent.sources.avro-collection-source.type = avro
agent.sources.avro-collection-source.bind = 127.0.0.1
agent.sources.avro-collection-source.port = 80
agent.sources.avro-collection-source.channels = memoryChannel2
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.hdfs.path = /var/flume/avro
agent.sinks.hdfs-sink.channel = memoryChannel2
agent.channels.memoryChannel2.type = memory
agent.channels.memoryChannel2.capacity = 20000
agent.channels.memoryChannel2.transactionCapacity = 2000
Thanks for any advice!
Upvotes: 0
Views: 3416
Reputation: 3845
I think this can be done. In the example given below, I am using a source (source1) which is reading from a spooling directory source and dumping it to avro sink. I have another source (source2) which is an avro source and linked to the avro sink of the source1. This way you have flow which you are looking for. Please modify this conf file as per your use:
# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.
dataplatform.sources = source1 source2
dataplatform.channels = channel1 channel3
dataplatform.sinks = sink1 sink2 sink3
# For each source, channel, and sink, set standard properties.
dataplatform.sources.source1.type = spooldir
dataplatform.sources.source1.spoolDir = /home/flume/flume-sink-clean/
dataplatform.sources.source1.deserializer.maxLineLength = 1000000
dataplatform.sources.source1.deletePolicy = immediate
dataplatform.sources.source1.batchSize = 10000
dataplatform.sources.source1.decodeErrorPolicy = IGNORE
# Channel Type
dataplatform.channels.channel1.type = FILE
dataplatform.channels.channel1.checkpointDir = /home/flume/flume_file_channel/dataplatform/file-channel/checkpoint
dataplatform.channels.channel1.dataDirs = /home/flume/flume_file_channel/dataplatform/file-channel/data
dataplatform.channels.channel1.write-timeout = 60
dataplatform.channels.channel1.use-fast-replay = true
dataplatform.channels.channel1.transactionCapacity = 1000000
dataplatform.channels.channel1.maxFileSize = 2146435071
dataplatform.channels.channel1.capacity = 100000000
# Describe Sink2
dataplatform.sinks.sink2.type = avro
dataplatform.sinks.sink2.hostname = 0.0.0.0
dataplatform.sinks.sink2.port = 20002
dataplatform.sinks.sink2.batch-size = 10000
# Describe source2
dataplatform.sources.source2.type = avro
dataplatform.sources.source2.bind = 0.0.0.0
dataplatform.sources.source2.port = 20002
# Channel3: Source 2 to Channel3 to Local
dataplatform.channels.channel3.type = FILE
dataplatform.channels.channel3.checkpointDir = /home/flume/flume_file_channel/local/file-channel/checkpoint
dataplatform.channels.channel3.dataDirs = /home/flume/flume_file_channel/local/file-channel/data
dataplatform.channels.channel3.transactionCapacity = 1000000
dataplatform.channels.channel3.checkpointInterval = 30000
dataplatform.channels.channel3.maxFileSize = 2146435071
dataplatform.channels.channel3.capacity = 10000000
# Describe Sink3 (Local File System)
dataplatform.sinks.sink3.type = file_roll
dataplatform.sinks.sink3.sink.directory = /home/flume/flume-sink/
dataplatform.sinks.sink3.sink.rollInterval = 60
dataplatform.sinks.sink3.batchSize = 1000
# Bind the source and sink to the channel
dataplatform.sources.source1.channels = channel1
dataplatform.sources.source2.channels = channel3
dataplatform.sinks.sink1.channel = channel1
dataplatform.sinks.sink2.channel = channel2
dataplatform.sinks.sink3.channel = channel3
Upvotes: 1