Reputation: 36
Having simple event hub client in java (only 1 partition)
public static void main(String[] args) throws Exception {
// Create a blob container client that you use later to build an event processor client to receive and process events
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString(storageConnectionString)
.containerName(storageContainerName)
.buildAsyncClient();
// Create a builder object that you will use later to build an event processor client to receive and process events and errors.
EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
.connectionString(connectionString, eventHubName)
.consumerGroup("$default")
.processEvent(PARTITION_PROCESSOR)
.processError(ERROR_HANDLER)//.checkpointStore(new SampleCheckpointStore());
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
// Use the builder object to create an event processor client
EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
System.out.println("Starting event processor");
eventProcessorClient.start();
System.out.println("Press enter to stop.");
System.in.read();
System.out.println("Stopping event processor");
eventProcessorClient.stop();
System.out.println("Event processor stopped.");
System.out.println("Exiting process");
}
If I run client first and then send messages, it works as expected. Messages are processed.
If I stop client, then send messages to event hub, then I start client, messages previously sent are not processed at all. Messages sent after are processed. Why?
If I stop client, then I delete checkpoint data in Azure Blob Storage, then I start client, existing messages in Event Hub are not processed. Messages sent after are processed. Why?
Libraries used:
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.12.2</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.13.0</version>
</dependency>
I tried
Upvotes: 1
Views: 1093
Reputation: 7745
In its default configuration, the Java processor positions the reader for each partition at eventPosition.latest() when no checkpoint is found, which means that it will read only events published after the processor has been started. (the full set of positioning logic can be seen here)
When building the processor, the initialPartitionEventPosition map can be provided to specify a different starting position for each partition.
Upvotes: 1