azure event hub - only new messages are processed

Having simple event hub client in java (only 1 partition)

public static void main(String[] args) throws Exception {
        // Create a blob container client that you use later to build an event processor client to receive and process events
        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
                .connectionString(storageConnectionString)
                .containerName(storageContainerName)
                .buildAsyncClient();

        // Create a builder object that you will use later to build an event processor client to receive and process events and errors.
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
                .connectionString(connectionString, eventHubName)
                .consumerGroup("$default")
                .processEvent(PARTITION_PROCESSOR)
                .processError(ERROR_HANDLER)//.checkpointStore(new SampleCheckpointStore());
                .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));

        // Use the builder object to create an event processor client
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();

        System.out.println("Starting event processor");
        eventProcessorClient.start();

        System.out.println("Press enter to stop.");
        System.in.read();

        System.out.println("Stopping event processor");
        eventProcessorClient.stop();
        System.out.println("Event processor stopped.");

        System.out.println("Exiting process");
    }

If I run client first and then send messages, it works as expected. Messages are processed.

If I stop client, then send messages to event hub, then I start client, messages previously sent are not processed at all. Messages sent after are processed. Why?

If I stop client, then I delete checkpoint data in Azure Blob Storage, then I start client, existing messages in Event Hub are not processed. Messages sent after are processed. Why?

Libraries used:

        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-eventhubs</artifactId>
            <version>5.12.2</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
            <version>1.13.0</version>
        </dependency>

I tried

Upvotes: 1

Views: 1093

Answers (1)

Jesse Squire
Jesse Squire

Reputation: 7745

In its default configuration, the Java processor positions the reader for each partition at eventPosition.latest() when no checkpoint is found, which means that it will read only events published after the processor has been started. (the full set of positioning logic can be seen here)

When building the processor, the initialPartitionEventPosition map can be provided to specify a different starting position for each partition.

Upvotes: 1

Related Questions