Allamanda Weitgereist
Allamanda Weitgereist

Reputation: 125

Async processing of events in EventProcessorClient

We are migrating from an old SDK-Version to version 4 SDK of Azure EventHub and we are wondering if we still need to implement asynchronous handling of events by our selves. We are implementing the EventProcessorClient with the EventProcessorClientBuilder. so my question: Is the onEvent method called asynchronous in this example or not?

new EventProcessorClientBuilder()
        .connectionString(connectionString)
        .checkpointStore(new BlobCheckpointStore(blobContainer))
        .processEvent(this::onEvent)
        .buildEventProcessorClient();

I dug deep into the Library and found that in the PartitionPumpManager an EventHubConsumerAsyncClient is build. But Iam not 100% certain.

If it is asynchronous, is there any way to maximum number of tasks and/or a way to give a timeout for a single task?

Upvotes: 0

Views: 711

Answers (1)

Anu Thomas Chandy
Anu Thomas Chandy

Reputation: 570

The invocation of 'onEvent' is asynchronous, i.e., it won't block the main thread.

As shown below, there are four different (mutually exclusive) ways to register the callback to receive events. Each takes the callback to deliver events as the first parameter -

processEvent(Consumer<EventContext> onEvent);
processEvent(Consumer<EventContext> onEvent, Duration timeout);
processEvent(Consumer<EventBatchContext> onBatchOfEvents, int batchSize);
processEvent(Consumer<EventBatchContext> onBatchOfEvents, int batchSize, Duration timeout);

Option 2 has the timeout as a second parameter, i.e., how long to wait for an event to be available. If no event arrives within the timeout, the library invokes the callback with an EventContext object, and invoking getEventData() on this context object returns null.

Options 3 and 4 allow the application to register a callback to handle a batch of events; the second parameter indicates the desired number of events in each batch. The callback will be invoked with a EventBatchContext object where the batch can be accessed from EventBatchContext::getEvents().

Additionally, Option 4 allows specifying how long to wait for a batch of size N (where N is the second parameter) to be delivered to the callback. Upon timeout timer expiration, if there are only M events (where M < N), the batch with M elements will be delivered to the callback. If no events are available within the timeout, then the EventBatchContext object delivered to callback returns an empty list when its getEvents() is called.

I think Option 3 or 4 are what you're looking for, which we recommend using because callback for each event has a slight overhead.

Upvotes: 2

Related Questions