Kafka streams exactly once delivery

My goal is to consume from topic A, do some processing and produce to topic B, as a single atomic action. To achieve this I see two options:

  1. Use a spring-kafka @Kafkalistener and a KafkaTemplate as described here.
  2. Use Streams eos (exactly-once) functionality.

I have successfully verified option #1. By successfully, I mean that if my processing fails (IllegalArgumentException is thrown) the consumed message from topic A keeps being consumed by the KafkaListener. This is what I expect, as the offset is not committed and DefaultAfterRollbackProcessor is used.

I am expecting to see the same behaviour if instead of a KafkaListener I use a stream for consuming from topic A, processing and sending to topic B (option #2). But even though while I process an IllegalArgumentException is thrown the message is only consumed once by the stream. Is this the expected behaviour?

In the Streams case the only configuration I have is the following:

public class KafkaStreamsConfiguration {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig  kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "calculate-tax-sender-invoice-stream");        
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
        // this should be enough to enable transactions
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        return new StreamsConfig(props);

//required to create and start a new KafkaStreams, as when an exception is thrown the stream dies
// see here:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
    StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
    streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        public void uncaughtException(Thread t, Throwable e) {
            log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
            log.debug("creating and starting a new StreamsThread ..");
    return streamsBuilderFactoryBean;

My Stream is like this:

public SpecificAvroSerde<InvoiceEvents> eventSerde;

private TaxService taxService;

public KStream<String, InvoiceEvents> kStream(StreamsBuilder builder) {

    KStream<String, InvoiceEvents> kStream ="A",
            Consumed.with(Serdes.String(), eventSerde));

        .mapValues(v -> 
                // get tax from possibly remote service
                // an IllegalArgumentException("Tax calculation failed") is thrown by getTaxForInvoice()
                int tax = taxService.getTaxForInvoice(v);
                // create a TaxCalculated event
                InvoiceEvents taxCalculatedEvent = InvoiceEvents.newBuilder().setType(InvoiceEvent.TaxCalculated).setTax(tax).build();
                log.debug("Generating TaxCalculated event: {}", taxCalculatedEvent);
                return taxCalculatedEvent;
        .to("B", Produced.with(Serdes.String(), eventSerde));

    return kStream;

The happy path streams scenario works: if no exception is thrown while processing, message appears properly in topic B.

My unit test:

public void calculateTaxForInvoiceTaxCalculationFailed() throws Exception {
    log.debug("running test calculateTaxForInvoiceTaxCalculationFailed..");
                        .thenThrow(new IllegalArgumentException("Tax calculation failed"));

    InvoiceEvents invoiceCreatedEvent = createInvoiceCreatedEvent();
    List<KeyValue<String, InvoiceEvents>> inputEvents = Arrays.asList(
            new KeyValue<String, InvoiceEvents>("A", invoiceCreatedEvent));

     Properties producerConfig = new Properties();
     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
     producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
     producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1);
     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
     producerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
     producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "unit-test-producer");

    // produce with key
    IntegrationTestUtils.produceKeyValuesSynchronously("A", inputEvents, producerConfig);

    // wait for 30 seconds - I should observe re-consumptions of invoiceCreatedEvent, but I do not
// ...

Update: In my unit test I sent 50 invoiceEvents (orderId=1,...,50), I process them and sent them to a destination topic.

In my logs the behaviour I see is as follows:

invoiceEvent.orderId = 43 → consumed and successfully processed
invoiceEvent.orderId = 44 → consumed and IlleagalArgumentException thrown stream starts..
invoiceEvent.orderId = 44 → consumed and successfully processed
invoiceEvent.orderId = 45 → consumed and successfully processed
invoiceEvent.orderId = 46 → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and IlleagalArgumentException thrown
[29-0_0-producer] task [0_0] Error sending record (key A value {"type": ..., "payload": {**"id": "46"**, ... }}} timestamp 1529583666036) to topic invoice-with-tax.t due to {}; No more records will be sent and no more offsets will be recorded for this task. stream starts..
invoiceEvent.**orderId = 46** → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and successfully processed

Why after the 2nd failure, it re-consumes from invoiceEvent.orderId = 46?

The key points to have option 2 (Streams Transactions) working are:

  • Assign a Thread.UncaughtExceptionHandler() so that you start a new StreamThread in case of any uncaught exception (by default the StreamThread dies - see code snippet that follows). This can even happen if the production to Kafka broker fails, it does not have to be related to your business logic code in the stream.
  • Consider setting a policy for handling de-serailization of messages (when you consume). Check DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG (javadoc). For example, should you ignore and consume next message or stop consuming from the relevant Kafka partition.
  • In the case of Streams, even if you set MAX_POLL_RECORDS_CONFIG=1 (one record per poll/batch), still consumed offsets and produced messages are not committed per message. This case leads to cases as the one described in the question (see "Why after the 2nd failure, it re-consumes from invoiceEvent.orderId = 46?").
  • Kafka transactions simply do not work on Windows yet. The fix will be delivered in Kafka 1.1.1 (
  • Consider checking how you handle serialisation exceptions (or in general exceptions during production) (here and here)

    public class KafkaStreamsConfiguration {
        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public StreamsConfig  kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "blabla");
            props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
            // this should be enough to enable transactions
            props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
            return new StreamsConfig(props);
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
    public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) 
        StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
        streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            public void uncaughtException(Thread t, Throwable e) {
                log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
                log.debug("creating and starting a new StreamsThread ..");
        return streamsBuilderFactoryBean;

