Tommy
Tommy

Reputation: 370

Getting your data out of an EventProcessorHost

I'm very new to using EventProcessorHost and IEventProcessor and I'm trying to figure out how to get my data out of the EventProcessorClass. I currently have everything up and working if I just want to log the new messages to the console.

My current implementation( and I'm not even sure if it is acceptable or even good practice ) creates a static variable and then just stores the data in it as it comes in so another processor can collect it. Is this okay to do or is there a better cleaner way to access the data?

This is what I have so far( the locking mechanism is extremely basic and will be fixed when I get the rest of the code working ):

internal class Receiver
{
    public static List<string> incommingMessagesList = new List<string>();
    public static bool fIsDataListLocked = false;

    private EventProcessorHost m_EPHClient;

   ...

    Console.WriteLine( "Registering EventProcessor..." );
    await m_EPHClient.RegisterEventProcessorAsync<SimpleEventProcessor>();
 }
 public class SimpleEventProcessor : IEventProcessor
 {
   ...

    public Task ProcessEventsAsync( PartitionContext context, IEnumerable<EventData> messages )
    {
        foreach( var eventData in messages )
        {
            while( !Receiver.fIsDataListLocked )
            {
                Receiver.fIsDataListLocked = true ;
                Receiver.incommingMessagesList.Add( Encoding.UTF8.GetString( eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count ) );
                Receiver.fIsDataListLocked = false ;
            }

        }
        return context.CheckpointAsync();
    }
}

UPDATED:

As requested a little more information:

Basically I'm pulling data from 2 different ends of a pipeline to validate all messages make it through and to track their throughput, one end is the eventhub but the other is coming from a lwm2m server as an HTTP request. So I have a controller process running that needs to get data from both ends in order to clean/analyze the data. Like i said, i'm new to Event Processors but it didn't make sense to me to have the EventProcessorHost handle collecting both sets of data and then cleaning/analyzing it. I could definitely change to do things that way but it seems clunky.

Upvotes: 1

Views: 594

Answers (1)

Peter Bons
Peter Bons

Reputation: 29720

In a typical scenario an Event Processor receives and persists the data in the fastest way possible. Multiple Event Processor instances will read the data from the different EventHub partitions.

In your case you want to send the data somewhere else, and process it there in combination with another stream of that data. An in-memory collection like a List is probably not the best way to do this:

  • It needs to be thread-safe
  • On a crash data will be lost
  • You will need to manually remove processed data to prevent an ever growing collection

You are going to need some kind of producer / consumer implementation.

A possible solution is to write both data streams to a single destination like an Azure Storage Queue. This has the main advantage that when there is a failure all the data is still persisted and not lost. Your final processor can read at its own speed from the queue.

Upvotes: 1

Related Questions