Reputation: 370
I'm very new to using EventProcessorHost and IEventProcessor and I'm trying to figure out how to get my data out of the EventProcessorClass. I currently have everything up and working if I just want to log the new messages to the console.
My current implementation( and I'm not even sure if it is acceptable or even good practice ) creates a static variable and then just stores the data in it as it comes in so another processor can collect it. Is this okay to do or is there a better cleaner way to access the data?
This is what I have so far( the locking mechanism is extremely basic and will be fixed when I get the rest of the code working ):
internal class Receiver
{
public static List<string> incommingMessagesList = new List<string>();
public static bool fIsDataListLocked = false;
private EventProcessorHost m_EPHClient;
...
Console.WriteLine( "Registering EventProcessor..." );
await m_EPHClient.RegisterEventProcessorAsync<SimpleEventProcessor>();
}
public class SimpleEventProcessor : IEventProcessor
{
...
public Task ProcessEventsAsync( PartitionContext context, IEnumerable<EventData> messages )
{
foreach( var eventData in messages )
{
while( !Receiver.fIsDataListLocked )
{
Receiver.fIsDataListLocked = true ;
Receiver.incommingMessagesList.Add( Encoding.UTF8.GetString( eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count ) );
Receiver.fIsDataListLocked = false ;
}
}
return context.CheckpointAsync();
}
}
UPDATED:
As requested a little more information:
Basically I'm pulling data from 2 different ends of a pipeline to validate all messages make it through and to track their throughput, one end is the eventhub but the other is coming from a lwm2m server as an HTTP request. So I have a controller process running that needs to get data from both ends in order to clean/analyze the data. Like i said, i'm new to Event Processors but it didn't make sense to me to have the EventProcessorHost handle collecting both sets of data and then cleaning/analyzing it. I could definitely change to do things that way but it seems clunky.
Upvotes: 1
Views: 594
Reputation: 29720
In a typical scenario an Event Processor receives and persists the data in the fastest way possible. Multiple Event Processor instances will read the data from the different EventHub partitions.
In your case you want to send the data somewhere else, and process it there in combination with another stream of that data. An in-memory collection like a List is probably not the best way to do this:
You are going to need some kind of producer / consumer implementation.
A possible solution is to write both data streams to a single destination like an Azure Storage Queue. This has the main advantage that when there is a failure all the data is still persisted and not lost. Your final processor can read at its own speed from the queue.
Upvotes: 1