antony prakash
antony prakash

Reputation: 31

how to consume the message from azure event hub with only subscriber?

I send the message to azure event hub. But I am not able to download the message from event hub.

enter code here
string eventHubConnectionString = "<connection string>";
string eventHubName = "<event Hub name>";
string storageAccountName = "<event hub storage>";
string storageAccountKey = "<storage Key>";
string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",storageAccountName, storageAccountKey);


EventProcessorHost eventProcessorHost = new EventProcessorHost("message", eventHubName,  EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
          eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>().Wait();


IEventProcessor:
enter code here
class SimpleEventProcessor : IEventProcessor

{


    Stopwatch checkpointStopWatch;

    async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
    {
        Console.WriteLine(string.Format("Processor Shuting Down.  Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason.ToString()));
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }

    Task IEventProcessor.OpenAsync(PartitionContext context)
    {
        Console.WriteLine(string.Format("SimpleEventProcessor initialize.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset));
        this.checkpointStopWatch = new Stopwatch();
        this.checkpointStopWatch.Start();
        return Task.FromResult<object>(null);
    }

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (EventData eventData in messages)
        {
            string data = Encoding.UTF8.GetString(eventData.GetBytes());

            Console.WriteLine(string.Format("Message received.  Partition: '{0}', Data: '{1}'",
                context.Lease.PartitionId, data));
        }

        //Call checkpoint every 5 minutes, so that worker can resume processing from the 5 minutes back if it restarts.
        if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
        {
            await context.CheckpointAsync();
            lock (this)
            {
                this.checkpointStopWatch.Reset();
            }
        }
    }

}

It show the following error Aggregate exception handling. One or more error occurred. Message details: No such host is known what is EventProcessor host name?

It show error at this line: eventProcessorHost.RegisterEventProcessorAsync().Wait();

it is not calling IEventprocessor. Is it have any other method for consuming message from event hub?

Upvotes: 0

Views: 4421

Answers (2)

jsturtevant
jsturtevant

Reputation: 2610

I have had success building a event processor using the sample code located here.

It is hard to tell from your sample code what the error is because it could be related to a typo in your connection string/eventhub/storage account name since it is not provided (you did right, don't post your connection string with sensitive data).

The difference between the way the example loads the event hub information from the connection string and the way the code you provided is how the information is provided through the Evenhub Client. Try update the way you build your EventProcessorHost like the example below:

    EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(eventHubConnectionString, this.eventHubName); 

    // Get the default Consumer Group 
    defaultConsumerGroup = eventHubClient.GetDefaultConsumerGroup(); 
    string blobConnectionString = ConfigurationManager.AppSettings["AzureStorageConnectionString"]; // Required for checkpoint/state 
    eventProcessorHost = new EventProcessorHost("singleworker", eventHubClient.Path, defaultConsumerGroup.GroupName, this.eventHubConnectionString, blobConnectionString); 
    eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>().Wait(); 

Upvotes: 0

Circus Ranger
Circus Ranger

Reputation: 189

You can trace your exception and look for inner exception while debugging so this should give you a clue whats the real reason.. I had this stupid exception too and it was because when you use the eventHubName variable with EventProcessorHost it should be in lowercase, (containing only letters/numbers and '-' which has to be followed by letter or number which means that '--' is not supported. eventHubName should also begin with a letter)

Even if the event hub name is "myEventHub123" your variable has to be like:

string eventHubName = "myeventhub123";

Hope this will help someone..

Upvotes: 2

Related Questions