Reputation: 51
We have a requirement where we are consuming messages from one topic then there is some enrichment happening and then we are publishing the message to another topic. below are the events
I am using Spring cloud kafka binder and things are working fine. Recently we introduced idempotent producer and included transactionIdPrefix property and we observed that outbound channel is started sending 2 messages in the topic as it should have sent one message only. one message with actual json value another message with value as 'b'\x00\x00\x00\x00\x06&' characters. Below is the code and config. If I remove transactionIdPrefix then I could see one message only sent to outbound topic.
@StreamListener("INPUT")
@SendTo("OUTPUT")
public void consumer(Message message){
Acknowledgement ack = messge.getHeaders().get(KafkaHeaders.ACKNOWLEDGEMENT,Acknowledgement.class))
try{
String inputMessage = message.getPayload.toString();
String enrichMessage = // Enrichment on inputMessage
ack.acknowledgement()
return enrichMessage;
}catch( Exception exp){
ack.acknowledgement();
throw exp;
}
}
Configs are
Messages that sent to the outbound topic are below.
ConsumerRecord(topic = test, partition = 1, offset = 158, CreateTime = 1574297903694, timestamp= 1238776543, time_stamp_type=0, key=None value=b'{"name":"abc","age":"20"}',checksum=None,serialized_key_size=-1,serialized_value_size=159)
ConsumerRecord(topic = test, partition = 1, offset = 158, CreateTime = 1574297903694, timestamp= 1238776543, time_stamp_type=0, key=b'\x00\x00\x00\x01' value=b'\x00\x00\x00\x00\x06&',checksum=None,serialized_key_size=-1,serialized_value_size=159)
Even in Dlq Topic message goes twice.
Appreciate if anybody can provide any pointers on this issue we are facing.
Cheers
Upvotes: 1
Views: 531
Reputation: 200
I believe your code is working fine. Transactional producers technically do send messages twice - the uncommitted transactional records and then those very same records but marked as a complete/committed transaction. In other words, you should check to see if you configured your consumers (the ones consuming from the transactional topic in app) isolation.level
to read_committed
.
consumeRecords -> producer initiates the transaction -> enrichments/processing -> producer send message (UNCOMMITTED) -> finish processing the rest of the records in last .poll()
batch -> commit / abort transaction (COMMITTED) -> repeat
Upvotes: 0