Reputation: 1
I am new to Apache flume https://flume.apache.org/. For one of the use-case, I need to move data from the Kafka topic on one cluster (bootstrap: bootstrap1, topic: topic1) to topic with different name in a different cluster (bootstrap: bootstrap2, topic: topic2). There are another use-cases in same project which fits best for flume and I need to use same flume pipeline for this use-case though there are other options to copy from Kafka to Kafka.
I tried below configs and the results are as mentioned in each options.
#1: telnet to kafka sink (bootstrap2, topic2) --> works perfect. configs:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic2
a1.sinks.k1.kafka.bootstrap.servers = bootstrap2
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#2: kafka as source(bootstrap1, topic1) and logger as sink --> works perfect.
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 10
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bootstrap1
a1.sources.r1.kafka.topics = topic1
a1.sources.r1.kafka.consumer.group.id = flume-gis-consumer
a1.sources.r1.backoffSleepIncrement = 1000
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#3: kafka as source (bootstrap1, topic1) and kafka as sink(bootstrap2, topic2) --> gives error as mentioned below the config.
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 10
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bootstrap1
a1.sources.r1.kafka.topics = topic1
a1.sources.r1.kafka.consumer.group.id = flume-gis-consumer1
a1.sources.r1.backoffSleepIncrement = 1000
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic2
a1.sinks.k1.kafka.bootstrap.servers = bootstrap2
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Error:
(kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:968)] [Producer clientId=producer-1] Error while fetching metadata with correlation id 85 : {topic1=UNKNOWN_TOPIC_OR_PARTITION}
Continuously shows above error.
ERROR upon terminating flume-ng command
(SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:268)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.EventDeliveryException: Could not send event
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:234)
... 3 more
Seeking help from the stackoverflow community on:
Upvotes: 0
Views: 469
Reputation: 1
I encountered the same issue today. My case is even worse because I host two topics on a single Kafka cluster.
It is really misleading that the producer thread in Kafka sink is producing back to the Kafka source topic.
I fixed the issue by setting allowTopicOverride
to false
for Kafka sink.
Quote from Kafka sink part in Flume document:
allowTopicOverride: Default is
true
. When set, the sink will allow a message to be produced into a topic specified by thetopicHeader
property (if provided).
topicHeader: When set in conjunction with allowTopicOverride will produce a message into the value of the header named using the value of this property. Care should be taken when using in conjunction with the Kafka Source topicHeader property to avoid creating a loopback.
And in Kafka source part:
setTopicHeader: Default is
true
. When set to true, stores the topic of the retrieved message into a header, defined by the topicHeader property.
So by default, Apache Flume store the Kafka source topic in topicHeader
for each event. Then, Kafka sink by default write to the topic specify in topicHeader
.
Upvotes: 0