haggz
haggz

Reputation: 92

Flume Avro Sink Source using cloudera quickstart

Is it possible to set up a Flume Client-Collector-Structure using Avro Sink/Source in Cloudera-Quickstart-CDH-VM ? I know there's no practical use, however I wanted to understand how Flume works with Avro Files and how I can use them later with PIG etc..

It tried several configurations, however non of them worked. For me it seems that I need several agents, however there can only be one in the VM.

What I tried last:

    agent.sources = reader avro-collection-source
    agent.channels = memoryChannel memoryChannel2
    agent.sinks = avro-forward-sink hdfs-sink

  #Client
    agent.sources.reader.type = exec
    agent.sources.reader.command = tail -f /home/flume/avro/source.txt

    agent.sources.reader.logStdErr = true
    agent.sources.reader.restart = true


    agent.sources.reader.channels = memoryChannel


    agent.sinks.avro-forward-sink.type = avro
    agent.sinks.avro-forward-sink.hostname = 127.0.0.1
    agent.sinks.avro-forward-sink.port = 80


    agent.sinks.avro-forward-sink.channel = memoryChannel


    agent.channels.memoryChannel.type = memory

    agent.channels.memoryChannel.capacity = 10000
    agent.channels.memoryChannel.transactionCapacity = 100

 # Collector

    agent.sources.avro-collection-source.type = avro
    agent.sources.avro-collection-source.bind = 127.0.0.1
    agent.sources.avro-collection-source.port = 80

    agent.sources.avro-collection-source.channels = memoryChannel2

    agent.sinks.hdfs-sink.type = hdfs
    agent.sinks.hdfs-sink.hdfs.path = /var/flume/avro

    agent.sinks.hdfs-sink.channel = memoryChannel2

    agent.channels.memoryChannel2.type = memory

    agent.channels.memoryChannel2.capacity = 20000
    agent.channels.memoryChannel2.transactionCapacity = 2000

Thanks for any advice!

Upvotes: 0

Views: 3416

Answers (1)

Amar
Amar

Reputation: 3845

I think this can be done. In the example given below, I am using a source (source1) which is reading from a spooling directory source and dumping it to avro sink. I have another source (source2) which is an avro source and linked to the avro sink of the source1. This way you have flow which you are looking for. Please modify this conf file as per your use:

# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.
dataplatform.sources  = source1 source2
dataplatform.channels = channel1 channel3 
dataplatform.sinks    = sink1 sink2 sink3


# For each source, channel, and sink, set standard properties.
dataplatform.sources.source1.type         = spooldir
dataplatform.sources.source1.spoolDir     = /home/flume/flume-sink-clean/
dataplatform.sources.source1.deserializer.maxLineLength = 1000000
dataplatform.sources.source1.deletePolicy = immediate
dataplatform.sources.source1.batchSize    = 10000
dataplatform.sources.source1.decodeErrorPolicy = IGNORE

# Channel Type
dataplatform.channels.channel1.type = FILE
dataplatform.channels.channel1.checkpointDir = /home/flume/flume_file_channel/dataplatform/file-channel/checkpoint
dataplatform.channels.channel1.dataDirs = /home/flume/flume_file_channel/dataplatform/file-channel/data
dataplatform.channels.channel1.write-timeout = 60
dataplatform.channels.channel1.use-fast-replay = true
dataplatform.channels.channel1.transactionCapacity = 1000000
dataplatform.channels.channel1.maxFileSize = 2146435071
dataplatform.channels.channel1.capacity = 100000000


# Describe Sink2
dataplatform.sinks.sink2.type = avro
dataplatform.sinks.sink2.hostname = 0.0.0.0
dataplatform.sinks.sink2.port = 20002
dataplatform.sinks.sink2.batch-size = 10000

# Describe source2
dataplatform.sources.source2.type = avro
dataplatform.sources.source2.bind = 0.0.0.0
dataplatform.sources.source2.port = 20002


# Channel3: Source 2 to Channel3 to Local
dataplatform.channels.channel3.type = FILE
dataplatform.channels.channel3.checkpointDir = /home/flume/flume_file_channel/local/file-channel/checkpoint
dataplatform.channels.channel3.dataDirs = /home/flume/flume_file_channel/local/file-channel/data
dataplatform.channels.channel3.transactionCapacity = 1000000
dataplatform.channels.channel3.checkpointInterval = 30000
dataplatform.channels.channel3.maxFileSize = 2146435071
dataplatform.channels.channel3.capacity = 10000000

# Describe Sink3 (Local File System)
dataplatform.sinks.sink3.type = file_roll
dataplatform.sinks.sink3.sink.directory = /home/flume/flume-sink/
dataplatform.sinks.sink3.sink.rollInterval = 60
dataplatform.sinks.sink3.batchSize = 1000

# Bind the source and sink to the channel
dataplatform.sources.source1.channels = channel1
dataplatform.sources.source2.channels = channel3
dataplatform.sinks.sink1.channel = channel1
dataplatform.sinks.sink2.channel = channel2
dataplatform.sinks.sink3.channel = channel3

Upvotes: 1

Related Questions