Reputation: 647
I am trying to create kafka producer in trasnsaction i.e. i want to write a group of msgs if anyone fails i want to rollback all the msg.
kafkaProducer.beginTransaction();
try
{
// code to produce to kafka topic
}
catch(Exception e)
{
kafkaProducer.abortTransaction();
}
kafkaProducer.commitTransaction();
The problem is for single thread above works just fine, but when multiple threads writes it throws exception
Invalid transaction attempted from state IN_TRANSITION to IN_TRANSITION
while debugging I found that if the thread1 transaction is in progress and thread2 also says beingTransaction it throws this exception. What I dont find if how to solve this issue. One possible thing I could find is creating a pool of produce.
Is there any already available API for kafka producer pool or i will have to create my own.
Below is the improvement jira already reported for this. https://issues.apache.org/jira/browse/KAFKA-6278
Any other suggestion will be really helpful
Upvotes: 0
Views: 1288
Reputation: 304
Not sure if this was resolved. you can use apache common pool2 to create a producer instance pool. In the create() method of the factory implementation you can generate and assign a unique transactionalID to avoid a conflict (ProducerFencedException)
Upvotes: 0
Reputation: 26950
You can only have a single transaction in progress at a time with a producer instance.
If you have multiple threads doing separate processing and they all need exactly once semantics, you should have a producer instance per thread.
Upvotes: 1