Reputation: 741
Kafka newbie.
Kafka version: 2.3.1
I am trying to consume Kafka message from two topics using spring cloud. I have not done much configuration apart from kafka binder and some simple config like below. Whenever (Group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null) is unavailable or invalid, will attempt rediscovery)happen, bunch of message which has already processed is getting processed again. Not sure what is happening.
spring.cloud.stream.kafka.binder.brokers: xxxxx:9094
spring:
cloud:
stream:
default:
group: bbb-bl-kyc
bindings:
input:
destination: bbb.core.sar.blul.events,bbb.core.sar.bluloc.events
contentType: application/json
consumer:
headerMode: embeddedHeaders
spring.kafka.consumer.properties.spring.json.trusted.packages: "*"
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
#Custom Serializer configurations to secure data
spring.cloud.stream.kafka.binder.configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: pnc.aop.core.kafka.serialization.MessageSecuredByteArraySerializer
value.deserializer: pnc.aop.core.kafka.serialization.MessageSecuredByteArrayDeserializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
2020-05-29 07:01:11.389 INFO 1 --- [container-0-C-1] p.a.b.k.service.KYCOrchestrationService : Done with Customer xxxx MS call response handling Confm Id: 159073553171893 Appln Id: HSUKQJDJNZNMWVZZ
2020-05-29 07:01:11.393 INFO 1 --- [container-0-C-1] p.a.b.kyc.service.DMSIntegrationService : Message written to the DMS topic successfully 159073553171893
2020-05-29 07:01:11.394 INFO 1 --- [container-0-C-1] p.a.b.k.s.AdminConsoleProducerService : Message written to Admin console Application Log topic successfully Confm Id: 159073553171893 Appln Id: HSUKQJDJNZNMWVZZ
2020-05-30 17:21:13.140 INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null) is unavailable or invalid, will attempt rediscovery
2020-05-30 17:21:13.122 INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null) is unavailable or invalid, will attempt rediscovery
2020-05-30 17:21:14.522 INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Discovered group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null)
2020-05-30 17:21:14.692 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Discovered group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null)
2020-05-30 17:21:15.151 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Attempt to heartbeat failed for since member id consumer-4-f5a03efd-75cd-425b-94e1-efd3d728d7ca is not valid.
2020-05-30 17:21:15.152 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Revoking previously assigned partitions [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:15.173 INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : bbb-bl-kyc: partitions revoked: [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:15.141 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Attempt to heartbeat failed for since member id consumer-2-52012bae-1b22-4211-b107-803fb3765720 is not valid.
2020-05-30 17:21:15.175 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] (Re-)joining group
2020-05-30 17:21:15.176 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Revoking previously assigned partitions [bbb.core.sar.blul.events-0]
2020-05-30 17:21:15.184 INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : bbb-bl-kyc: partitions revoked: [bbb.core.sar.blul.events-0]
2020-05-30 17:21:15.184 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] (Re-)joining group
2020-05-30 17:21:18.200 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Successfully joined group with generation 66
2020-05-30 17:21:18.200 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Successfully joined group with generation 66
2020-05-30 17:21:18.200 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Setting newly assigned partitions: bbb.core.sar.bluloc.events-0
2020-05-30 17:21:18.200 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Setting newly assigned partitions: bbb.core.sar.blul.events-0
2020-05-30 17:21:18.203 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Found no committed offset for partition bbb.core.sar.blul.events-0
2020-05-30 17:21:18.203 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Found no committed offset for partition bbb.core.sar.bluloc.events-0
2020-05-30 17:21:18.537 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Resetting offset for partition bbb.core.sar.blul.events-0 to offset 4.
2020-05-30 17:21:18.538 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Resetting offset for partition bbb.core.sar.bluloc.events-0 to offset 0.
2020-05-30 17:21:18.621 INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : bbb-bl-kyc: partitions assigned: [bbb.core.sar.blul.events-0]
2020-05-30 17:21:18.625 INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : bbb-bl-kyc: partitions assigned: [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:18.822 INFO 1 --- [container-0-C-1] p.a.b.k.stream.KYCbbbCoreEventsListener : Initiating KYC Orchestration 159071814927374
2020-05-30 17:21:18.826 INFO 1 --- [container-0-C-1] p.a.b.k.stream.KYCbbbCoreEventsListener : Initiating KYC Orchestration null
2020-05-30 17:21:18.928 INFO 1 --- [container-0-C-1] p.a.b.k.s.AdminConsoleProducerService : Message written to Admin console Application topic successfully Confm Id: null Appln Id: XQZ58K3H3XZADTAT
Upvotes: 0
Views: 1725
Reputation: 18525
Without changing much of the consumer configurations, you will have at least once delivery semantics.
When the Group Coordinator is temporarly not available your consumer won't be able to commit the messages it processed. After re-joining your consumer will again process same messages (as they were not committed yet) leading to duplicates.
You can find more details on GroupCoordinator and delivery semantics here
Upvotes: 1