Sach
Sach

Reputation: 51

Spring cloud kafka Stream Binder transactionIdPrefix send 2 messages to outbound topic

We have a requirement where we are consuming messages from one topic then there is some enrichment happening and then we are publishing the message to another topic. below are the events

  1. Consumer - Consume the message
  2. Enrichment - Enriched the consumed message
  3. Producer - Published Enriched message to other topic

I am using Spring cloud kafka binder and things are working fine. Recently we introduced idempotent producer and included transactionIdPrefix property and we observed that outbound channel is started sending 2 messages in the topic as it should have sent one message only. one message with actual json value another message with value as 'b'\x00\x00\x00\x00\x06&' characters. Below is the code and config. If I remove transactionIdPrefix then I could see one message only sent to outbound topic.

@StreamListener("INPUT")
@SendTo("OUTPUT")
public void consumer(Message message){
Acknowledgement ack = messge.getHeaders().get(KafkaHeaders.ACKNOWLEDGEMENT,Acknowledgement.class))
try{
    String inputMessage = message.getPayload.toString();
    String enrichMessage = // Enrichment on inputMessage
    ack.acknowledgement()   
    return enrichMessage;
}catch( Exception exp){
    ack.acknowledgement();
    throw exp;  
}
}

Configs are

  1. spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix=TX-
  2. spring.cloud.stream.kafka.binder.transaction.producer.configuration.ack=all
  3. spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=10
  4. spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
  5. spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
  6. spring.cloud.stream.kafka.bindings.input.consumer.dlqName=error.topic
  7. spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOnError=true
  8. spring.cloud.stream.kafka.bindings.input.consumer.maxAttempt=3
  9. spring.cloud.stream.kafka.binder.transaction.producer.configuration.enable.idempotence=true

Messages that sent to the outbound topic are below.

  1. ConsumerRecord(topic = test, partition = 1, offset = 158, CreateTime = 1574297903694, timestamp= 1238776543, time_stamp_type=0, key=None value=b'{"name":"abc","age":"20"}',checksum=None,serialized_key_size=-1,serialized_value_size=159)

  2. ConsumerRecord(topic = test, partition = 1, offset = 158, CreateTime = 1574297903694, timestamp= 1238776543, time_stamp_type=0, key=b'\x00\x00\x00\x01' value=b'\x00\x00\x00\x00\x06&',checksum=None,serialized_key_size=-1,serialized_value_size=159)

Even in Dlq Topic message goes twice.

Appreciate if anybody can provide any pointers on this issue we are facing.

Cheers

Upvotes: 1

Views: 531

Answers (1)

Nerm
Nerm

Reputation: 200

I believe your code is working fine. Transactional producers technically do send messages twice - the uncommitted transactional records and then those very same records but marked as a complete/committed transaction. In other words, you should check to see if you configured your consumers (the ones consuming from the transactional topic in app) isolation.level to read_committed.

consumeRecords -> producer initiates the transaction -> enrichments/processing -> producer send message (UNCOMMITTED) -> finish processing the rest of the records in last .poll() batch -> commit / abort transaction (COMMITTED) -> repeat

Upvotes: 0

Related Questions