Reputation: 2931
I am new to Kafka and I have a Kafka consumer implemented using Java Apache Camel library. The issue I found out is - Consumer takes a long time(>15 mins) to process for few messages- which is fine for our use case.
Need some config help because the same message is re-sent after 15 mins if not processed within 15 mins(thread control does not return I believe). I think this might be the default interval, not sure which property is this.
So, where can I have to fix the config
My producer has these properties:
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
My consumer configs look like
<endpoint id="apolloKafkaJanitorEventListenerURI"
uri="kafka:${kafka.bootstrap.servers}?topic=${apollo.janitor.event.topic}&
groupId=${apollo.janitor.event.group.id}&
consumersCount=${apollo.janitor.event.consumer.count}&
consumerRequestTimeoutMs=${eventConsumerRequestTimeoutMs}&
sessionTimeoutMs=${eventConsumerSessionTimeoutMs}&
maxPartitionFetchBytes=${eventConsumerMaxPartitionFetchBytes}" />
I have googled didn't find any relevant issues. Found "acks=0" property on Producer and for Consumer the following. Have not tested but want to see if I am on right track first
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();
Upvotes: 0
Views: 2155
Reputation: 7005
You try to avoid multiple deliveries of the same message. This is the wrong approach.
In a messaging system you have to deal with messages that can be delivered multiple times, simply because they are needed to guarantee message delivery in some situations (see here for a short explanation).
You can't totally avoid multiple deliveries without sacrifice other aspects of your system.
If you instead build your consumers idempotent, they don't care if a message is delivered multiple times by the broker. That way you don't need to restrict your broker.
Upvotes: 1
Reputation: 346
The problem might be at the producer end. You may nee need to check if producer is resending the message. you can use logging statements for the same. or you can use exactly once semantics for kafka producer. you only need to add an extra property for the same.
Other might be that your consumer is not committing the offsets. you might need to do some brainstorming at this end too
Upvotes: 1