Maazen
Maazen

Reputation: 107

Kafka streams shuts down if sink node/topic not reachable?

I want to test scenario when Kafka Streams using Processor API is reading from a source and writing to list of topics and one or two topics are not reachable ( failure test: trying to simulate it by adding 1/2 topics which do not exist in the cluster).

     topology.addSource("mysource","source_topic");
     topology.addProcessor("STREAM_PROCESSOR",()->new SourceProcessor(),"mysource");
     topology.addSink("SINK_1","TOPIC_1","STREAM_PROCESSOR");
     topology.addSink("SINK_2","TOPIC_2","STREAM_PROCESSOR");
     topology.addSink("SINK_3","TOPIC_3","STREAM_PROCESSOR"); // This topic is not present in cluster

      sourceContext.forward(eventName,eventMessage,To.child("sink1 or sink2 or sink3"));

My understanding is kafkaStreams should give error for the topic which is not present and continue forwarding records to topic1 and 2 which exists.

But the behavior I see is it gives the following error :

     Exception in thread "StreamProcessor-56da56e4-4ab3-4ca3-bf48-b059558b689f-StreamThread-1" 
     org.apache.kafka.streams.errors.StreamsException: 
     task [0_0] Abort sending since an error caught with a previous record (timestamp 1592940025090) to topic "TOPIC_X" due to 
     org.apache.kafka.common.errors.TimeoutException: Topic "TOPIC_X" not present in metadata after 60000 ms.
     Timeout exception caught when sending record to topic "TOPIC_X". This might happen if the producer cannot send data to the
     Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the
     broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.

Is this the correct way of simulating non-reachable topics or topic not present issues ? also why does the Kafka streams shuts down with the above error even when we are handling Streams and topology exceptions . kafka streams should not shutdown if one of the sink topics is not available or reachable for some reason right ? . Kindly suggest

On the above error I want to forward the error when catching the StreamsException to Error topic , however kafkastreams stops prematurely.

catch(StreamsException e)
{
    context.forward("","",Error_Topic)
}

Is this an expected behavior ?

Refer : https://docs.confluent.io/current/streams/developer-guide/manage-topics.html#user-topics does this mean that a non-existent topic is not allowed in kafkastreams topology as a sink node . Please confirm.

Upvotes: 0

Views: 1464

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

It's by design that Kafka Streams shuts downs if it cannot write into a sink topic. The reason is that by default, Kafka Streams guarantees at-least-once processing semantics and if it cannot write data to one sink topic but would continues, at-least-once processing would be violated as there would be data loss in the sink topic.

There is a production.exception.handler configuration that might help. It allows you to swallow certain exception when writing data into an output topic. However, note, that this implies that you have data loss on the corresponding topic.

Upvotes: 1

Related Questions