Gul
Gul

Reputation: 21

In kafka stream topology with 'exactly_once' processing guarantee, message lost on exception

I have a requirement where I need to process messages from Kafka without losing any message and also need to maintain the message order. Therefore, I used transactions and enabled 'exactly_once' processing guarantee in my Kafka streams topology. As I assume that the topology processing will be 'all or nothing', that the message offset is committed only after the last node successfully processed the message.

However in a failure scenario, for example when the database is down and the processor fails to store message and throws an exception. At this point, the topology dies as intended and is recreated automatically on rebalance. I assume that the topology should either re-consume the original message again from the Kafka topic OR on application restart, it should re-consume that original message from Kafka topic. However, it seems that original message disappears and is never consumed or processed after that topology died.

What do I need to do to reprocess the original message sent to Kafka topic? Or what Kafka configuration requires change? Do I need manually assign a state store and keep track of messages processed on a changelog topic?

Topology:

@Singleton
public class EventTopology extends Topology {

    private final Deserializer<String> deserializer = Serdes.String().deserializer();
    private final Serializer<String> serializer = Serdes.String().serializer();

    private final EventLogMessageSerializer eventLogMessageSerializer;
    private final EventLogMessageDeserializer eventLogMessageDeserializer;
    private final EventLogProcessorSupplier eventLogProcessorSupplier;

    @Inject
    public EventTopology(EventsConfig eventsConfig,
                         EventLogMessageSerializer eventLogMessageSerializer,
                         EventLogMessageDeserializer eventLogMessageDeserializer,
                         EventLogProcessorSupplier eventLogProcessorSupplier) {

        this.eventLogMessageSerializer = eventLogMessageSerializer;
        this.eventLogMessageDeserializer = eventLogMessageDeserializer;
        this.eventLogProcessorSupplier = eventLogProcessorSupplier;
        init(eventsConfig);
    }

    private void init(EventsConfig eventsConfig) {

        var topics = eventsConfig.getTopicConfig().getTopics();
        String eventLog = topics.get("eventLog");

        addSource("EventsLogSource", deserializer, eventLogMessageDeserializer, eventLog)
             .addProcessor("EventLogProcessor", eventLogProcessorSupplier, "EventsLogSource");
    }
}

Processor:

@Singleton
@Slf4j
public class EventLogProcessor implements Processor<String, EventLogMessage> {

    private final EventLogService eventLogService;
    private ProcessorContext context;

    @Inject
    public EventLogProcessor(EventLogService eventLogService) {
        this.eventLogService = eventLogService;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, EventLogMessage value) {
        log.info("Processing EventLogMessage={}", value);

        try {
            eventLogService.storeInDatabase(value);
            context.commit();

        } catch (Exception e) {
            log.warn("Failed to process EventLogMessage={}", value, e);
            throw e;
        }
    }
    
    @Override
    public void close() {
    }
}

Configuration:

eventsConfig:
  saveTopicsEnabled: false
  topologyConfig:
    environment: "LOCAL"
    broker: "localhost:9093"
    enabled: true
    initialiseWaitInterval: 3 seconds
    applicationId: "eventsTopology"
    config:
      auto.offset.reset: latest
      session.timeout.ms: 6000
      fetch.max.wait.ms: 7000
      heartbeat.interval.ms: 5000
      connections.max.idle.ms: 7000
      security.protocol: SSL
      key.serializer: org.apache.kafka.common.serialization.StringSerializer
      value.serializer: org.apache.kafka.common.serialization.StringSerializer
      max.poll.records: 5
      processing.guarantee: exactly_once
      metric.reporters: com.simple.metrics.kafka.DropwizardReporter
      default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
      enable.idempotence: true
      request.timeout.ms: 8000
      acks: all
      batch.size: 16384
      linger.ms: 1
      enable.auto.commit: false
      state.dir: "/tmp"
  topicConfig:
      topics:
        eventLog: "EVENT-LOG-LOCAL"
      kafkaTopicConfig:
        partitions: 18
        replicationFactor: 1
        config:
          retention.ms: 604800000

Test:

Feature: Feature covering the scenarios to process event log messages produced by external client.

  Background:
    Given event topology is healthy

  Scenario: event log messages produced are successfully stored in the database
    Given database is down
    And the following event log messages are published
      | deptId | userId     | eventType | endDate              | eventPayload_partner |
      | dept-1 | user-1234  | CREATE    | 2021-04-15T00:00:00Z | PARTNER-1            |
    When database is up
    And database is healthy
    Then event log stored in the database as follows
      | dept_id | user_id   | event_type | end_date             | event_payload           |
      | dept-1  | user-1234 | CREATE     | 2021-04-15T00:00:00Z | {"partner":"PARTNER-1"} |

Logs:

INFO  [data-plane-kafka-request-handler-1] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Preparing to rebalance group eventsTopology in state PreparingRebalance with old generation 0 (__consumer_offsets-0) (reason: Adding new member eventsTopology-57fdac0e-09fb-4aa0-8b0b-7e01809b31fa-StreamThread-1-consumer-96a3e980-4286-461e-8536-5f04ccb2c778 with group instance id None) 
INFO  [executor-Rebalance] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Stabilized group eventsTopology generation 1 (__consumer_offsets-0) 
INFO  [data-plane-kafka-request-handler-2] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Assignment received from leader for group eventsTopology for generation 1 
INFO  [data-plane-kafka-request-handler-1] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-0_0 with producerId 0 and producer epoch 0 on partition __transaction_state-4 
INFO  [data-plane-kafka-request-handler-6] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-0_1 with producerId 1 and producer epoch 0 on partition __transaction_state-3 
...
INFO  [data-plane-kafka-request-handler-0] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-0_16 with producerId 17 and producer epoch 0 on partition __transaction_state-37 
INFO  [data-plane-kafka-request-handler-4] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-1_1 with producerId 18 and producer epoch 0 on partition __transaction_state-42 
INFO  [data-plane-kafka-request-handler-6] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-1_0 with producerId 19 and producer epoch 0 on partition __transaction_state-43 
...
INFO  [data-plane-kafka-request-handler-3] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-1_17 with producerId 34 and producer epoch 0 on partition __transaction_state-45 
INFO  [data-plane-kafka-request-handler-5] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-1_16 with producerId 35 and producer epoch 0 on partition __transaction_state-46 
INFO  [pool-26-thread-1] ManagerClient - Manager request {uri:http://localhost:8081/healthcheck, method:GET, body:'', headers:{}} 
INFO  [pool-26-thread-1] ManagerClient - Manager response from with body {"Database":{"healthy":true},"eventsTopology":{"healthy":true}} 
INFO  [dw-admin-130] KafkaConnectionCheck - successfully connected to kafka broker: localhost:9093 
INFO  [kafka-producer-network-thread | EVENT-LOG-LOCAL-test-client-id] LocalTestEnvironment - Message: ProducerRecord(topic=EVENT-LOG-LOCAL, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={"endDate":1618444800000,"deptId":"dept-1","userId":"user-1234","eventType":"CREATE","eventPayload":{"previousEndDate":null,"partner":"PARTNER-1","info":null}}, timestamp=null) pushed onto topic: EVENT-LOG-LOCAL 
INFO  [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] EventLogProcessor - Processing EventLogMessage=EventLogMessage(endDate=Thu Apr 15 01:00:00 BST 2021, deptId=dept-1, userId=user-1234, eventType=CREATE, eventPayload=EventLogMessage.EventPayload(previousEndDate=null, partner=PARTNER-1, info=null)) 
WARN  [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] EventLogProcessor - Failed to process EventLogMessage=EventLogMessage(endDate=Thu Apr 15 01:00:00 BST 2021, deptId=dept-1, userId=user-1234, eventType=CREATE, eventPayload=EventLogMessage.EventPayload(previousEndDate=null, partner=PARTNER-1, info=null)) 
    exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
    at manager.service.EventLogService.storeInDatabase(EventLogService.java:24)
    at manager.topology.processor.EventLogProcessor.process(EventLogProcessor.java:47)
    at manager.topology.processor.EventLogProcessor.process(EventLogProcessor.java:19)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
ERROR [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] Failed to process stream task 0_8 due to the following error: 
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_8, processor=EventsLogSource, topic=EVENT-LOG-LOCAL, partition=8, offset=0, stacktrace=exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
ERROR [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down:  
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_8, processor=EventsLogSource, topic=EVENT-LOG-LOCAL, partition=8, offset=0, stacktrace=exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
ERROR [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] org.apache.kafka.streams.KafkaStreams - stream-client [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3] All stream threads have died. The instance will be in error state and should be closed. 
Exception: java.lang.IllegalStateException thrown from the UncaughtExceptionHandler in thread "eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1"
INFO  [executor-Heartbeat] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Member eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1-consumer-f11ca299-2a68-4317-a559-dd1b96cd431f in group eventsTopology has failed, removing it from the group 
INFO  [executor-Heartbeat] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Preparing to rebalance group eventsTopology in state PreparingRebalance with old generation 1 (__consumer_offsets-0) (reason: removing member eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1-consumer-f11ca299-2a68-4317-a559-dd1b96cd431f on heartbeat expiration) 
INFO  [data-plane-kafka-request-handler-2] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Stabilized group eventsTopology generation 2 (__consumer_offsets-0) 
INFO  [data-plane-kafka-request-handler-6] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Assignment received from leader for group eventsTopology for generation 2 
INFO  [data-plane-kafka-request-handler-0] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-0_0 with producerId 0 and producer epoch 1 on partition __transaction_state-4 
...
INFO  [data-plane-kafka-request-handler-0] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-1_16 with producerId 35 and producer epoch 1 on partition __transaction_state-46 
INFO  [main] Cluster - New databse host localhost/127.0.0.1:59423 added 

com.jayway.awaitility.core.ConditionTimeoutException: Condition defined as a lambda expression in steps.EventLogSteps
Expecting:
 <0>
to be equal to:
 <1>
but was not. within 20 seconds. 

Upvotes: 1

Views: 616

Answers (0)

Related Questions