Veronica_Zotali
Veronica_Zotali

Reputation: 252

How to pass a dependency to a class that implements IEventProcessor(Event Hub)

I have the following problem.

we use Event Hub . In the following class, we Inherit from IEventProcessor and as you can see we use Service Locator . We can't get it to work with constructor/property injection. It seems that Castle Windsor can't resolve dependencies when the class that aims to inherits from the IEventProcessor. Is that a known issue or there is something I need to do to get it working?

Below is the code :

public class EventProcessor : IEventProcessor
{
    private readonly IEventService _eventService;
    private readonly ILogger _logger;
    private readonly Lazy<RetryPolicy> _retryPolicy;
    private readonly IConfigurationProvider _configurationProvider;

    public EventProcessor()
    {
        try
        {
            _eventService = ContainerProvider.Current.Container.Resolve<IEventService>();
            _logger = ContainerProvider.Current.Container.Resolve<ILogger>();
            _configurationProvider =     ContainerProvider.Current.Container.Resolve<IConfigurationProvider>();

        }
        catch (Exception exception)
        {
            _logger.WriteError(string.Format("Error occured when intializing EventProcessor: '{0}'", exception));
        }
    }

    public Task OpenAsync(PartitionContext context)
    {
        return Task.FromResult(0);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
    {
        var eventsList = events.ToList();
        EventData lastEvent = null;
        foreach (var eventData in eventsList)
        {
            _logger.WriteVerbose(string.Format("Consumming {0} events...", eventsList.Count()));
            _eventService.ProcessEvent(eventData);
            lastEvent = eventData;
        }

        if (lastEvent != null)
        {
            await AzureServiceBusRetryPolicy.ExecuteAsync(async () => await context.CheckpointAsync(lastEvent));
        }
    }

    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        _logger.WriteInfo("EventHub processor was closed for this reason: " + reason);

        if (reason == CloseReason.Shutdown)
        {
            await AzureServiceBusRetryPolicy.ExecuteAsync(async () => await context.CheckpointAsync());
        }

    }


}

Thanks

Upvotes: 5

Views: 1533

Answers (2)

RagtimeWilly
RagtimeWilly

Reputation: 5445

I'm using Autofac but ran into the same problem.

I solved by implementing the IEventProcessorFactory and using it when registering the processor in the EventProcessorHost.

To give an example, my EventProcessorHost looks something like this:

public class EventHubProcessorHost
{
    private readonly IEventProcessorFactory _eventProcessorFactory;
    private readonly string _serviceBusConnectionString;
    private readonly string _storageConnectionString;
    private readonly string _eventHubName;

    public EventHubProcessorHost(IEventProcessorFactory eventProcessorFactory, string serviceBusConnectionString, string storageConnectionString, string eventHubName)
    {
        _eventProcessorFactory = eventProcessorFactory;
        _serviceBusConnectionString = serviceBusConnectionString;
        _storageConnectionString = storageConnectionString;
        _eventHubName = eventHubName;
    }

    public void Start()
    {
        var builder = new ServiceBusConnectionStringBuilder(_serviceBusConnectionString)
        {
            TransportType = TransportType.Amqp
        };

        var client = EventHubClient.CreateFromConnectionString(builder.ToString(), _eventHubName);

        try
        {
            var eventProcessorHost = new EventProcessorHost("singleworker",
              client.Path, client.GetDefaultConsumerGroup().GroupName, builder.ToString(), _storageConnectionString);

            eventProcessorHost.RegisterEventProcessorFactoryAsync(_eventProcessorFactory);
        }
        catch (Exception exp)
        {
            Console.WriteLine("Error on send: " + exp.Message);
        }
    }
}

And the factory I pass in has a reference to my IoC container:

public class MyEventProcessorFactory : IEventProcessorFactory
{
    private readonly IComponentContext _componentContext;

    public MyEventProcessorFactory(IComponentContext componentContext)
    {
        _componentContext = componentContext;
    }

    public IEventProcessor CreateEventProcessor(PartitionContext context)
    {
        return _componentContext.Resolve<IEventProcessor>();
    }
}

This allows me to use constructor injection as normal in my EventProcessor:

public class MyEventProcessor : IEventProcessor
{
    private IFoo _foo;

    public MyEventProcessor(IFoo foo)
    {
        _foo = foo;
    }

    public Task OpenAsync(PartitionContext context)
    {
        return Task.FromResult<object>(null);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
    {
        foreach (var eventData in events)
        {
            // Processing code
        }

        await context.CheckpointAsync();
    }

    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }
}

Then I just wire everything up as normal in the Autofac container at start-up:

builder.RegisterType<Foo>().As<IFoo>()

builder.RegisterType<MyEventProcessor>().As<IEventProcessor>()

builder.Register(c => new MyEventProcessorFactory(c.Resolve<IComponentContext>())).As<IEventProcessorFactory>();

Hope this helps.

Upvotes: 6

nozzleman
nozzleman

Reputation: 9649

This is not the way to inject dependecies using castle windsor. Given your code ctor-injection would work with a constructor listing the dependencies as parameters. In your case, this would look sth. like

public EventProcessor(IEventService eventService, ILogger logger, IConfigurationProvider configProvider)
{
    try
    {
        this._eventService = eventService;
        this._logger = logger; // however, i suggest using Property injection for that, see next example
        this._configurationProvider = configProvider;

    }
    catch (Exception exception)
    {
        this._logger.WriteError(string.Format("Error occured when intializing EventProcessor: '{0}'", exception));
    }
}

You could then resolve the EventProcessor itsself and those dependencies will be injected.

Property injection works with public properties. You simply define them like nomal properties and they get injected indepentendly.

public ILogger Logger {get; set;}

Consider using Castle Windsors way to inject loggers (see LoggingFacility in Winsdor docs. Maybe, you want to use NLog or log4net (like me)? --> then there is a ready-to-use nugetpackage (Castle Windor log4net integraton).

Upvotes: 0

Related Questions