Reputation: 1
I have an EventHubConsumer that consumes data from Azure EventHub. So I have Event Grid Topic that is consuming from external Producer. The event grid topic after consuming from Producer it pushes data to EventHub and I am then consuming from the EventHub in my spring boot application. The consumer is working and I am able to consume events/me ssages and I save them to a database tracker table. The issue I am seeing is that I see duplicates are been processed and saved. The producer is not sending duplicates.
So this is the architecture I have:
Producer: Event Grid Topic: One EventHub consumer (EventHub subscription to event grid): One partition: One consumer group: Spring boot application
I debugged my application and I see that its not my service method processAzureEventHubMessage that is causing the issue because I see that it processes a 1 message at a time and finishes it by saving it to database, and I see "Checkpoint was updated successfully" after each execution of the method when it finishes processing a single message. I don't know if it's not checkpointing correctly, and I don't know why the same message comes again from eventHub to my application, but I set up all my config correctly and I am not sure If something is missing in my config or my code.
Please help if you see any bug in the code that might cause the duplication issue.
My Consumer method:
@Bean
public Consumer<Message<String>> consume(){
return message ->{
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
message.getPayload(),
message.getHeaders().get(AzureHeaders.PARTITION_KEY),
message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
message.getHeaders().get(EventHubsHeaders.OFFSET),
message.getHeaders().get(EventHubsHeaders.ENQUEUED)
);
try{
processAzureEventHubMessage(message);
checkpointer.success().block();
log.inf("Checkpoit was updated successfully", message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER));
}catch(Exception e){
log.error("Error processing event", e.getMessage());
}
};
}
My Service method that processes the message do some mapping and save it to a Database table
public Response processAzureEventHubMessage(Message<String> message){
Response reponse = new Response();
try{
Data data = eventConverter.convertToObject(message.getPayload());
Request request = new Request();
String player_id = data.getEvent().getPlayerEvent().getPlayerId().trim();
String status = data.getEvent().getPlayerEvent().getType().trim().toUpperCase();
// below is just some rest of logic that process the data and send it to database
}
}
return response;
}
My config yaml file for the eventHub Consumer:
spring:
cloud:
azure:
eventhubs:
connection-string: ......
processor:
checkpointer-store:
account-name: checkpointerstorage
container-name: messages-container
stream:
bindings:
consume-in-o:
destination: my-eventhub
group: my-consumer-group
eventhubs:
bindings:
consume-in-0:
consumer:
checkpoint:
mode: MANUAL
poller:
initial-delay: 50
fixed delay: 1000
function:
destination: consume
Upvotes: 0
Views: 159
Reputation: 3639
I don’t know why the same message comes again from eventHub to my application, but I set up all my config correctly and I am not sure If something is missing in my config or my code.
checkpointer.success().block()
, which blocks the current thread until the checkpoint is confirmed. Check that no exceptions are being swallowed and that the checkpoint is indeed being updated correctly.Consumer Method:
@Bean
public Consumer<Message<String>> consume() {
return message -> {
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
log.info("Processing message with sequence number: {}", message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER));
try {
processAzureEventHubMessage(message);
checkpointer.success().block();
log.info("Checkpoint was updated successfully for sequence number: {}", message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER));
} catch (Exception e) {
log.error("Error processing event: {}", e.getMessage());
}
};
}
Service Method:
public Response processAzureEventHubMessage(Message<String> message) {
Response response = new Response();
try {
Data data = eventConverter.convertToObject(message.getPayload());
Request request = new Request();
String playerId = data.getEvent().getPlayerEvent().getPlayerId().trim();
String status = data.getEvent().getPlayerEvent().getType().trim().toUpperCase();
// Check if the message with the given playerId and status has already been processed
if (isMessageAlreadyProcessed(playerId, status)) {
log.info("Message with playerId {} and status {} has already been processed.", playerId, status);
return response;
}
// Process the data and save to the database
// Your logic here
// Mark the message as processed
markMessageAsProcessed(playerId, status);
} catch (Exception e) {
log.error("Error processing event: {}", e.getMessage());
}
return response;
}
Utility Methods:
private boolean isMessageAlreadyProcessed(String playerId, String status) {
// Implement logic to check if the message with the given playerId and status has already been processed
// For example, query the database or check an in-memory cache
}
private void markMessageAsProcessed(String playerId, String status) {
// Implement logic to mark the message as processed
// For example, update the database or an in-memory cache
}
AzureDiagnostics
| where ResourceType == "EVENTHUBS" and Resource == "YOUR_EVENTHUB_NAME"
| where OperationName == "CheckpointSuccess"
| summarize count() by bin(TimeGenerated, 1h)
Upvotes: 0