Jawad Timzit
Jawad Timzit

Reputation: 1

Why my Spring boot EventHubConsumer has Duplicate consumption issue

I have an EventHubConsumer that consumes data from Azure EventHub. So I have Event Grid Topic that is consuming from external Producer. The event grid topic after consuming from Producer it pushes data to EventHub and I am then consuming from the EventHub in my spring boot application. The consumer is working and I am able to consume events/me ssages and I save them to a database tracker table. The issue I am seeing is that I see duplicates are been processed and saved. The producer is not sending duplicates.

So this is the architecture I have:

Producer: Event Grid Topic: One EventHub consumer (EventHub subscription to event grid): One partition: One consumer group: Spring boot application

I debugged my application and I see that its not my service method processAzureEventHubMessage that is causing the issue because I see that it processes a 1 message at a time and finishes it by saving it to database, and I see "Checkpoint was updated successfully" after each execution of the method when it finishes processing a single message. I don't know if it's not checkpointing correctly, and I don't know why the same message comes again from eventHub to my application, but I set up all my config correctly and I am not sure If something is missing in my config or my code.

Please help if you see any bug in the code that might cause the duplication issue.

My Consumer method:

@Bean
public Consumer<Message<String>> consume(){
   return message ->{
    Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
        message.getPayload(),
        message.getHeaders().get(AzureHeaders.PARTITION_KEY),
        message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
        message.getHeaders().get(EventHubsHeaders.OFFSET),
        message.getHeaders().get(EventHubsHeaders.ENQUEUED)
        );
       try{
          processAzureEventHubMessage(message);
          checkpointer.success().block();
          log.inf("Checkpoit was updated successfully",     message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER));
          
       }catch(Exception e){
          log.error("Error processing event", e.getMessage());
       }
   };
}

My Service method that processes the message do some mapping and save it to a Database table

public Response processAzureEventHubMessage(Message<String> message){

 Response reponse = new Response();
 try{
   

    Data data = eventConverter.convertToObject(message.getPayload());
    Request request = new Request();

    String player_id = data.getEvent().getPlayerEvent().getPlayerId().trim();
    String status = data.getEvent().getPlayerEvent().getType().trim().toUpperCase();

    // below is just some rest of logic that process the data and send it to database
    }

 }
   return response;
}


My config yaml file for the eventHub Consumer:

spring:
  cloud:
    azure:
      eventhubs:
        connection-string: ......
        processor:
          checkpointer-store:
            account-name: checkpointerstorage
            container-name: messages-container

    stream:
       bindings:
         consume-in-o:
           destination: my-eventhub
           group: my-consumer-group

       eventhubs:
         bindings:
           consume-in-0:
             consumer:
               checkpoint:
                mode: MANUAL
       poller:
         initial-delay: 50
         fixed delay: 1000
    function:
      destination: consume

Upvotes: 0

Views: 159

Answers (1)

Sampath
Sampath

Reputation: 3639

I don’t know why the same message comes again from eventHub to my application, but I set up all my config correctly and I am not sure If something is missing in my config or my code.

  • In the consumer method, you are calling checkpointer.success().block(), which blocks the current thread until the checkpoint is confirmed. Check that no exceptions are being swallowed and that the checkpoint is indeed being updated correctly.

Consumer Method:

@Bean
public Consumer<Message<String>> consume() {
    return message -> {
        Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
        log.info("Processing message with sequence number: {}", message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER));
        
        try {
            processAzureEventHubMessage(message);
            checkpointer.success().block();
            log.info("Checkpoint was updated successfully for sequence number: {}", message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER));
        } catch (Exception e) {
            log.error("Error processing event: {}", e.getMessage());
        }
    };
}

Service Method:

public Response processAzureEventHubMessage(Message<String> message) {
    Response response = new Response();
    try {
        Data data = eventConverter.convertToObject(message.getPayload());
        Request request = new Request();

        String playerId = data.getEvent().getPlayerEvent().getPlayerId().trim();
        String status = data.getEvent().getPlayerEvent().getType().trim().toUpperCase();

        // Check if the message with the given playerId and status has already been processed
        if (isMessageAlreadyProcessed(playerId, status)) {
            log.info("Message with playerId {} and status {} has already been processed.", playerId, status);
            return response;
        }

        // Process the data and save to the database
        // Your logic here

        // Mark the message as processed
        markMessageAsProcessed(playerId, status);

    } catch (Exception e) {
        log.error("Error processing event: {}", e.getMessage());
    }
    return response;
}

Utility Methods:

  • Note: Implement utility methods to check and mark messages as processed
private boolean isMessageAlreadyProcessed(String playerId, String status) {
    // Implement logic to check if the message with the given playerId and status has already been processed
    // For example, query the database or check an in-memory cache
}

private void markMessageAsProcessed(String playerId, String status) {
    // Implement logic to mark the message as processed
    // For example, update the database or an in-memory cache
}

  • Use Azure Monitor and Azure Log Analytics to run queries and set up alerts. Below is a sample query to monitor successful checkpoints:
AzureDiagnostics
| where ResourceType == "EVENTHUBS" and Resource == "YOUR_EVENTHUB_NAME"
| where OperationName == "CheckpointSuccess"
| summarize count() by bin(TimeGenerated, 1h)

Upvotes: 0

Related Questions