Mohan
Mohan

Reputation: 741

Kafka consumer message commit issue

Kafka newbie.

Kafka version: 2.3.1

I am trying to consume Kafka message from two topics using spring cloud. I have not done much configuration apart from kafka binder and some simple config like below. Whenever (Group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null) is unavailable or invalid, will attempt rediscovery)happen, bunch of message which has already processed is getting processed again. Not sure what is happening.

spring.cloud.stream.kafka.binder.brokers: xxxxx:9094
spring:
  cloud:
    stream:
      default:
        group: bbb-bl-kyc
      bindings:
        input:
          destination: bbb.core.sar.blul.events,bbb.core.sar.bluloc.events
          contentType: application/json
          consumer:
            headerMode: embeddedHeaders  

spring.kafka.consumer.properties.spring.json.trusted.packages: "*"
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
#Custom Serializer configurations to secure data
spring.cloud.stream.kafka.binder.configuration:
  key.serializer: org.apache.kafka.common.serialization.StringSerializer
  value.serializer: pnc.aop.core.kafka.serialization.MessageSecuredByteArraySerializer
  value.deserializer: pnc.aop.core.kafka.serialization.MessageSecuredByteArrayDeserializer
  key.deserializer: org.apache.kafka.common.serialization.StringDeserializer



2020-05-29 07:01:11.389  INFO 1 --- [container-0-C-1] p.a.b.k.service.KYCOrchestrationService  : Done with Customer xxxx MS call response handling  Confm Id: 159073553171893 Appln Id: HSUKQJDJNZNMWVZZ
2020-05-29 07:01:11.393  INFO 1 --- [container-0-C-1] p.a.b.kyc.service.DMSIntegrationService  : Message written to the DMS topic successfully 159073553171893
2020-05-29 07:01:11.394  INFO 1 --- [container-0-C-1] p.a.b.k.s.AdminConsoleProducerService    : Message written to Admin console Application Log topic successfully  Confm Id: 159073553171893 Appln Id: HSUKQJDJNZNMWVZZ
2020-05-30 17:21:13.140  INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null) is unavailable or invalid, will attempt rediscovery
2020-05-30 17:21:13.122  INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null) is unavailable or invalid, will attempt rediscovery
2020-05-30 17:21:14.522  INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Discovered group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null)
2020-05-30 17:21:14.692  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Discovered group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null)
2020-05-30 17:21:15.151  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Attempt to heartbeat failed for since member id consumer-4-f5a03efd-75cd-425b-94e1-efd3d728d7ca is not valid.
2020-05-30 17:21:15.152  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Revoking previously assigned partitions [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:15.173  INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : bbb-bl-kyc: partitions revoked: [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:15.141  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Attempt to heartbeat failed for since member id consumer-2-52012bae-1b22-4211-b107-803fb3765720 is not valid.
2020-05-30 17:21:15.175  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] (Re-)joining group
2020-05-30 17:21:15.176  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Revoking previously assigned partitions [bbb.core.sar.blul.events-0]
2020-05-30 17:21:15.184  INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : bbb-bl-kyc: partitions revoked: [bbb.core.sar.blul.events-0]
2020-05-30 17:21:15.184  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] (Re-)joining group
2020-05-30 17:21:18.200  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Successfully joined group with generation 66
2020-05-30 17:21:18.200  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Successfully joined group with generation 66
2020-05-30 17:21:18.200  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Setting newly assigned partitions: bbb.core.sar.bluloc.events-0
2020-05-30 17:21:18.200  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Setting newly assigned partitions: bbb.core.sar.blul.events-0
2020-05-30 17:21:18.203  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Found no committed offset for partition bbb.core.sar.blul.events-0
2020-05-30 17:21:18.203  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Found no committed offset for partition bbb.core.sar.bluloc.events-0
2020-05-30 17:21:18.537  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Resetting offset for partition bbb.core.sar.blul.events-0 to offset 4.
2020-05-30 17:21:18.538  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Resetting offset for partition bbb.core.sar.bluloc.events-0 to offset 0.
2020-05-30 17:21:18.621  INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : bbb-bl-kyc: partitions assigned: [bbb.core.sar.blul.events-0]
2020-05-30 17:21:18.625  INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : bbb-bl-kyc: partitions assigned: [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:18.822  INFO 1 --- [container-0-C-1] p.a.b.k.stream.KYCbbbCoreEventsListener  : Initiating KYC Orchestration 159071814927374
2020-05-30 17:21:18.826  INFO 1 --- [container-0-C-1] p.a.b.k.stream.KYCbbbCoreEventsListener  : Initiating KYC Orchestration null
2020-05-30 17:21:18.928  INFO 1 --- [container-0-C-1] p.a.b.k.s.AdminConsoleProducerService    : Message written to Admin console Application topic successfully Confm Id: null Appln Id: XQZ58K3H3XZADTAT

Upvotes: 0

Views: 1725

Answers (1)

Michael Heil
Michael Heil

Reputation: 18525

Without changing much of the consumer configurations, you will have at least once delivery semantics.

When the Group Coordinator is temporarly not available your consumer won't be able to commit the messages it processed. After re-joining your consumer will again process same messages (as they were not committed yet) leading to duplicates.

You can find more details on GroupCoordinator and delivery semantics here

Upvotes: 1

Related Questions