Reputation: 2043
Regarding this configuration my understanding is flume is reading message to kafka topic source-topic , push this message/event to kafka channel/topic test-topic and then sink consume it and write it to ElasticSearch.
To test this flow, I explicitly pushed 1 message/event to kafka topic source-topic and was expecting this event on sink side. But it did not work for me.
Then I did some debugging on it and thought message / event must be in kafka channel. But when I tried to run the bin/kafka-topics.sh --list --zookeeper localhost:2181 command then it did not return test-topic on console.
Now my question is , is this channel name is not kafka topic ?
if not then how can I query the event from kafka channel or may be if someone can help me to understand this flow.
test.sources = ks
test.sinks = es
test.channels = kc
# SOURCES
test.sources.ks.type = org.apache.flume.source.kafka.KafkaSource
test.sources.ks.zookeeperConnect = 127.0.0.1:2181
test.sources.ks.topic = source-topic
test.sources.ks.groupId = cst
test.sources.ks.batchSize = 1000
test.sources.ks.batchDurationMillis = 1000
test.sources.ks.kafka.consumer.timeout.ms = 100
test.sources.ks.kafka.auto.offset.reset = smallest
# sink
test.sinks.es.type = org.es.TestElasticSearchSink
test.sinks.es.hostNames = 127.0.0.1:9200
test.sinks.es.indexName = test-idx
test.sinks.es.batchSize = 1000
test.sinks.es.iaCacheLifetime = 20
# Normal channel
test.channels.kc.type = org.kc.TestKafkaChannel
test.channels.kc.capacity = 10000
test.channels.kc.transactionCapacity = 1000
test.channels.kc.brokerList = 127.0.0.1:9092
test.channels.kc.topic = test-topic
test.channels.kc.zookeeperConnect = 127.0.0.1:2181
test.channels.kc.parseAsFlumeEvent = false
test.channels.kc.readSmallestOffset = true
test.channels.kc.groupId = test-flume
Upvotes: 0
Views: 321
Reputation: 191743
You will probably want to pre-create all necessary Kafka topics before starting Flume. However, it's not clear what is org.kc.TestKafkaChannel
, or org.es.TestElasticSearchSink
. Flume has provided classes for both of these (Kafka channel +Elasticsearch sink), I believe, so anything "not working" would begin in either of your "custom" classes here...
Alternatively, Kafka Connect already has an Elasticsearch sink connector, so you don't need an intermediate Kafka topic just to send data between Kafka and Elasticsearch. Logstash would work as well.
Upvotes: 0