Reputation: 3067
I am currently working through implementing an Event Hub reader using EventProcessorHost and a simple IEventProcessor implementation. I have confirmed that telemetry data is being written into the Event Hub using Paolo Salvatori's excellent Service Bus Explorer. I have successfully configured the EventProcessorHost to use a storage account for leases and checkpoints. I can see the Event Hub data files in the storage account. The problem that I am seeing at this point is that the IEventProcessor implementation does not appear to be reading anything from the Event Hub.
I am not receiving any exceptions. The test console app is connecting to the storage account without issue. I have noticed that the logging statement I added to the constructor is never being called, so it looks like the receiver is never actually being created. I feel like I am missing something simple. Can anyone help me determine what I have missed? Thank you!
IEventProcessor Implementation:
namespace Receiver
{
internal class SimpleEventProcessor : IEventProcessor
{
private Stopwatch _checkPointStopwatch;
public SimpleEventProcessor()
{
Console.WriteLine("SimpleEventProcessor created");
}
#region Implementation of IEventProcessor
public Task OpenAsync(PartitionContext context)
{
Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}",
context.Lease.PartitionId, context.Lease.Offset);
_checkPointStopwatch = new Stopwatch();
_checkPointStopwatch.Start();
return Task.FromResult<object>(null);
}
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var data in messages.Select(eventData => Encoding.UTF8.GetString(eventData.GetBytes())))
{
Console.WriteLine("Message received. Partition: '{0}', Data: '{1}'", context.Lease.PartitionId,
data);
}
if (_checkPointStopwatch.Elapsed > TimeSpan.FromSeconds(30))
{
await context.CheckpointAsync();
_checkPointStopwatch.Restart();
}
}
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine("Processor shutting down. Partition '{0}', Reason: {1}", context.Lease.PartitionId,
reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
#endregion
}
}
Test Console Code:
namespace EventHubTestConsole
{
internal class Program
{
private static void Main(string[] args)
{
AsyncPump.Run((Func<Task>) MainAsync);
}
private static async Task MainAsync()
{
const string eventHubConnectionString =
"Endpoint=<EH endpoint>;SharedAccessKeyName=<key name>;SharedAccessKey=<key>";
const string eventHubName = "<event hub name>";
const string storageAccountName = "<storage account name>";
const string storageAccountKey = "<valid storage key>";
var storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",
storageAccountName, storageAccountKey);
Console.WriteLine("Connecting to storage account with ConnectionString: {0}", storageConnectionString);
var eventProcessorHostName = Guid.NewGuid().ToString();
var eventProcessorHost = new EventProcessorHost(
eventProcessorHostName,
eventHubName,
EventHubConsumerGroup.DefaultGroupName,
eventHubConnectionString,
storageConnectionString);
var epo = new EventProcessorOptions
{
MaxBatchSize = 100,
PrefetchCount = 1,
ReceiveTimeOut = TimeSpan.FromSeconds(20),
InitialOffsetProvider = (name) => DateTime.Now.AddDays(-7)
};
epo.ExceptionReceived += OnExceptionReceived;
await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(epo);
Console.WriteLine("Receiving. Please enter to stop worker.");
Console.ReadLine();
}
public static void OnExceptionReceived(object sender, ExceptionReceivedEventArgs args)
{
Console.WriteLine("Event Hub exception received: {0}", args.Exception.Message);
}
}
Upvotes: 2
Views: 2389
Reputation: 2331
It looks like the problem is with your value for EventProcessorOptions.PrefetchCount.
I changed your code slightly as shown here (removing the AsyncPump and shutting down the receivers cleanly). I found that RegisterEventProcessorAsync throws an exception if the PrefetchCount is less than 10.
namespace EventHubTestConsole
{
internal class Program
{
private static void Main(string[] args)
{
const string eventHubConnectionString =
"Endpoint=<EH endpoint>;SharedAccessKeyName=<key name>;SharedAccessKey=<key>";
const string eventHubName = "<event hub name>";
const string storageAccountName = "<storage account name>";
const string storageAccountKey = "<valid storage key>";
var storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",
storageAccountName, storageAccountKey);
Console.WriteLine("Connecting to storage account with ConnectionString: {0}", storageConnectionString);
var eventProcessorHostName = Guid.NewGuid().ToString();
var eventProcessorHost = new EventProcessorHost(
eventProcessorHostName,
eventHubName,
EventHubConsumerGroup.DefaultGroupName,
eventHubConnectionString,
storageConnectionString);
var epo = new EventProcessorOptions
{
MaxBatchSize = 100,
PrefetchCount = 10,
ReceiveTimeOut = TimeSpan.FromSeconds(20),
InitialOffsetProvider = (name) => DateTime.Now.AddDays(-7)
};
epo.ExceptionReceived += OnExceptionReceived;
eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(epo).Wait();
Console.WriteLine("Receiving. Please enter to stop worker.");
Console.ReadLine();
eventProcessorHost.UnregisterEventProcessorAsync().Wait();
}
public static void OnExceptionReceived(object sender, ExceptionReceivedEventArgs args)
{
Console.WriteLine("Event Hub exception received: {0}", args.Exception.Message);
}
}
}
Upvotes: 1