My goal is to consume from topic A, do some processing and produce to topic B, as a single atomic action. To achieve this I see two options:
I have successfully verified option #1. By successfully, I mean that if my processing fails (IllegalArgumentException is thrown) the consumed message from topic A keeps being consumed by the KafkaListener. This is what I expect, as the offset is not committed and DefaultAfterRollbackProcessor is used.
I am expecting to see the same behaviour if instead of a KafkaListener I use a stream for consuming from topic A, processing and sending to topic B (option #2). But even though while I process an IllegalArgumentException is thrown the message is only consumed once by the stream. Is this the expected behaviour?
In the Streams case the only configuration I have is the following:
public class KafkaStreamsConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "calculate-tax-sender-invoice-stream");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
// this should be enough to enable transactions
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
return new StreamsConfig(props);
//required to create and start a new KafkaStreams, as when an exception is thrown the stream dies
// see here:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
log.debug("creating and starting a new StreamsThread ..");
return streamsBuilderFactoryBean;
My Stream is like this:
public SpecificAvroSerde<InvoiceEvents> eventSerde;
private TaxService taxService;
public KStream<String, InvoiceEvents> kStream(StreamsBuilder builder) {
KStream<String, InvoiceEvents> kStream ="A",
Consumed.with(Serdes.String(), eventSerde));
.mapValues(v ->
// get tax from possibly remote service
// an IllegalArgumentException("Tax calculation failed") is thrown by getTaxForInvoice()
int tax = taxService.getTaxForInvoice(v);
// create a TaxCalculated event
InvoiceEvents taxCalculatedEvent = InvoiceEvents.newBuilder().setType(InvoiceEvent.TaxCalculated).setTax(tax).build();
log.debug("Generating TaxCalculated event: {}", taxCalculatedEvent);
return taxCalculatedEvent;
.to("B", Produced.with(Serdes.String(), eventSerde));
return kStream;
The happy path streams scenario works: if no exception is thrown while processing, message appears properly in topic B.
My unit test:
public void calculateTaxForInvoiceTaxCalculationFailed() throws Exception {
log.debug("running test calculateTaxForInvoiceTaxCalculationFailed..");
.thenThrow(new IllegalArgumentException("Tax calculation failed"));
InvoiceEvents invoiceCreatedEvent = createInvoiceCreatedEvent();
List<KeyValue<String, InvoiceEvents>> inputEvents = Arrays.asList(
new KeyValue<String, InvoiceEvents>("A", invoiceCreatedEvent));
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "unit-test-producer");
// produce with key
IntegrationTestUtils.produceKeyValuesSynchronously("A", inputEvents, producerConfig);
// wait for 30 seconds - I should observe re-consumptions of invoiceCreatedEvent, but I do not
// ...
Update: In my unit test I sent 50 invoiceEvents (orderId=1,...,50), I process them and sent them to a destination topic.
In my logs the behaviour I see is as follows:
invoiceEvent.orderId = 43 → consumed and successfully processed
invoiceEvent.orderId = 44 → consumed and IlleagalArgumentException thrown stream starts..
invoiceEvent.orderId = 44 → consumed and successfully processed
invoiceEvent.orderId = 45 → consumed and successfully processed
invoiceEvent.orderId = 46 → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and IlleagalArgumentException thrown
[29-0_0-producer] task [0_0] Error sending record (key A value {"type": ..., "payload": {**"id": "46"**, ... }}} timestamp 1529583666036) to topic invoice-with-tax.t due to {}; No more records will be sent and no more offsets will be recorded for this task. stream starts..
invoiceEvent.**orderId = 46** → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and successfully processed
Why after the 2nd failure, it re-consumes from invoiceEvent.orderId = 46?
The key points to have option 2 (Streams Transactions) working are:
Consider checking how you handle serialisation exceptions (or in general exceptions during production) (here and here)
public class KafkaStreamsConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "blabla");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
// this should be enough to enable transactions
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
return new StreamsConfig(props);
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig)
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
log.debug("creating and starting a new StreamsThread ..");
return streamsBuilderFactoryBean;
