viblo
viblo

Reputation: 4603

How to trigger to multiple eventhubs in azure webjob or function

I have a multiple Azure Event Hubs located in different Azure Regions. I want to write a webjob/function that can receive messages from all of these Event Hubs without me having to hard code which EHs to listen to.

So, normally you would have something like this, one function that can receive messages from one EH as defined in config:

public void Func([EventHubTrigger("%eventhubInName%")] string data)

In my case I have 6 different EventHubs spread out in 6 azure regions. What I want is to somehow at startup setup my function to listen on several Event Hubs so that I dont need to hard code 6 identical functions (Func1, Func2..) or host my webjob 6 times.

Is this possible somehow? For example by doing something during startup of the webjob?

Upvotes: 4

Views: 1548

Answers (2)

Sheetal Shivagunde
Sheetal Shivagunde

Reputation: 71

I know this thread is too old to answer, but recently I worked on the similar requirement (In which we were asked to add an EventHub dynamically for each customer whenever a new customer is introduced).

I resolved it by developing a WebJob, which pulls the EventHub connection details every hour from the database (along with it's main functionality of course). And if any new EventHub has been added in the database then, only for that EventHub, it generates a new thread. Also if, any EventHub has been removed from the database then, it kills that particular thread.

I achieved this by using a static list, with unique id for each eventhub.

Sample code:

private static List<Guid> _existingEventHubs; // static list consisting of unique identifier for each EventHub. It will be empty at first.


List<EventHubMappingDetails> eventHubDetails = // a database call here. This list will have eventHub connection details.

            if (eventHubDetails.Any())
            {

                //Logic to create new ConsumerRead Thread only for newly created event hubs and to kill the threads if the eventhub has been removed.
                if (_existingEventHubs.Count > 0) // check if there are already any entries available in the list
                {
                    //addDetails list contains the eventHub details for newly added eventHubs
                    List<EventHubMapping> addDetails = eventHubDetails.Where(_ => !_existingEventHubs.Contains(_.EHId)).ToList();

                    //removeDetails list contains eventhubMappingIds for the eventHubs which has been removed
                    List<Guid> removeDetails = _existingEventHubs.Where(_ => !eventHubDetails.Select(o => o.EHId).Contains(_)).ToList();
                    if (addDetails.Count > 0)
                    {
                        _existingEventHubs.AddRange(addDetails.Select(_ => _.EHId));

                        eventHubConnectionDetails = addDetails.Select(_ => new EventHubConnectionDetails()
                        {
                            connectionString = "Endpoint= + _.EHHostNameRead + ;SharedAccessKeyName= + _.EHSharedAccessKeyNameRead + ;SharedAccessKey= + _.EHSharedAccessKeyValueRead + ;EntityPath= + _.EHEntityPathRead",
                            counsumerGroup = // consumer Group Name here,
                            eventHubName = _.EHEntityPathRead,
                            ID = _.EHId.ToString()
                        }).ToList();

                        //Call to ConsumerReadEvent method to create new thread for all the newly created eventHubs
                        await _eventHubConsumerBusiness.ConsumerReadEvent(eventHubConnectionDetails);
                    }
                    if (removeDetails.Count > 0)
                    {
                        List<string> EHTobeRemovedList = removeDetails.Select(_ => _.ToString()).ToList();
                        _existingEventHubs.RemoveAll(_ => removeDetails.Contains(_));// remove all the EventHub unique identifiers from the static list if these are removed from data base
                    }

                }
                else
                {
                    _existingEventHubs.AddRange(eventHubDetails.Select(_ => _.EHId));

                    eventHubConnectionDetails = eventHubDetails.Select(_ => new EventHubConnectionDetails()
                    {
                        connectionString = "Endpoint= + _.EHHostNameRead + ;SharedAccessKeyName= + _.EHSharedAccessKeyNameRead + ;SharedAccessKey= + _.EHSharedAccessKeyValueRead + ;EntityPath= + _.EHEntityPathRead",
                        counsumerGroup = // consumer Group Name here,
                        eventHubName = _.EHEntityPathRead,
                        ID = _.EHId.ToString()
                    }).ToList();
                    await _eventHubConsumerBusiness.ConsumerReadEvent(eventHubConnectionDetails);
                }
            }

Then add Parallel.ForEach for every eventHub of eventHubConnectionDetails list.

for each thread, add the following code,

EventProcessorClient processor = new EventProcessorClient(storageClient, eventHubConnectionDetails.consumerGroup, eventHubConnectionDetails.connectionString, eventHubConnectionDetails.eventHubName);
try
{
processor.ProcessEventAsync += ProcessEventHandler;
                processor.ProcessErrorAsync += ProcessErrorHandler;
                await processor.StartProcessingAsync();

                //Logic to delay the StopProcessingAsync till _existingEventHubs list contains the eventHubMappingId for the currently running eventHub
                while (_existingEventHubs.Contains(eventHubConnectionDetails.ID))
                {
                    await Task.Delay(TimeSpan.FromMinutes(5));
                }

                await processor.StopProcessingAsync();
}
catch
{
}

Upvotes: 4

Mikhail Shilkov
Mikhail Shilkov

Reputation: 35134

A single Azure Function can't be linked to multiple triggers, and to my knowledge WebJob SDK can't do that either. Obviously, you could write your own Web Job (not-SDK), but I guess that's not what you are asking for.

The easiest way is probably to write a helper method:

public void Impl(string data) { ... }

and then define 6 identical Functions with different Event Hubs:

public void Func1([EventHubTrigger("%hub1%", Connection = "Conn1")] string data) 
    => Impl(data);

public void Func6([EventHubTrigger("%hub6%", Connection = "Conn6")] string data) 
    => Impl(data);

You can also create 6 function.json files in 6 folders manually, and make them point to the very same Impl function, but with different Event Hub settings.

Upvotes: 1

Related Questions