Lorenzo
Lorenzo

Reputation: 29427

Rebus multiple Queues based on content

Setup: Rebus in asp.net mvc project using SimpleInjector.

I need to create two handlers which receive messages, each from a specific queue. By following what I have found on this SO answer I have created similar code.

In a class library I have a class which implements SimpleInjector IPackage which have a code like this:

public void RegisterServices( Container container ) {
    container.Register<IHandleMessages<MyMessage>, MyMessageHandler>( Lifestyle.Scoped );
    IContainerAdapter adapter = new SimpleInjectorContainerAdapter( container );
    Configure.With( adapter )
             .Transport( t => t.UseAzureServiceBus( connectionString, "A_QUEUE_NAME", AzureServiceBusMode.Standard ) )
             .Options( oc => {
                 oc.SetNumberOfWorkers( 1 );
                 oc.SimpleRetryStrategy( errorQueueAddress: "A_ERROR_QUEUE_NAME", maxDeliveryAttempts: 3 );
             } )
             .Start();

    Configure.With(adapter
             .Transport(t => t.UseAzureServiceBus(connectionString, "B_QUEUE_NAME")
             .Options( oc => {
                 oc.SetNumberOfWorkers( 1 );
                 oc.SimpleRetryStrategy( errorQueueAddress: "B_ERROR_QUEUE_NAME", maxDeliveryAttempts: 3 );
             } )
             .Start();
}

However when the debugger get to the second Configure.With( ... ) call i terminates with an error saying:

Type IBus has already been registered. If your intention is to resolve a collection of IBus implementations, use the RegisterCollection overloads. More info: https://simpleinjector.org/coll1. If your intention is to replace the existing registration with this new registration, you can allow overriding the current registration by setting Container.Options.AllowOverridingRegistrations to true. More info: https://simpleinjector.org/ovrrd.

Stack trace:

[InvalidOperationException: Type IBus has already been registered. If your intention is to resolve a collection of IBus implementations, use the RegisterCollection overloads. More info: https://simpleinjector.org/coll1. If your intention is to replace the existing registration with this new registration, you can allow overriding the current registration by setting Container.Options.AllowOverridingRegistrations to true. More info: https://simpleinjector.org/ovrrd.]
   SimpleInjector.Internals.NonGenericRegistrationEntry.ThrowWhenTypeAlreadyRegistered(InstanceProducer producer) +102
   SimpleInjector.Internals.NonGenericRegistrationEntry.Add(InstanceProducer producer) +59
   SimpleInjector.Container.AddInstanceProducer(InstanceProducer producer) +105
   SimpleInjector.Container.AddRegistrationInternal(Type serviceType, Registration registration) +69
   SimpleInjector.Container.AddRegistration(Type serviceType, Registration registration) +131
   SimpleInjector.Container.RegisterSingleton(TService instance) +183
   Rebus.SimpleInjector.SimpleInjectorContainerAdapter.SetBus(IBus bus) +55
   Rebus.Config.RebusConfigurer.Start() +2356
   MyModule.RegisterServices(Container container) +497
   SimpleInjector.PackageExtensions.RegisterPackages(Container container, IEnumerable`1 assemblies) +50
   Myproject.SimpleInjectorInitializer.InitializeContainer(Container container) +35
   Myproject.SimpleInjectorInitializer.Initialize() +68
   Myproject.Startup.Configuration(IAppBuilder app) +28

EDIT

I have then removed the second Configure.With( ... ) block of code and now when I do a _bus.Send( message ) I get another error in the consumer process which says

Unhandled exception 1 while handling message with ID fef3acca-97f4-4495-b09d-96e6c9f66c4d: SimpleInjector.ActivationException: No registration for type IEnumerable<IHandleMessages<MyMessage>> could be found. There is, however, a registration for IHandleMessages<MyMessage>; Did you mean to call GetInstance<IHandleMessages<MyMessage>>() or depend on IHandleMessages<MyMessage>? Or did you mean to register a collection of types using RegisterCollection?

Stack Trace:

2017-04-13 10:21:03,805 [77] WARN  Rebus.Retry.ErrorTracking.InMemErrorTracker - 
   at SimpleInjector.Container.ThrowMissingInstanceProducerException(Type serviceType)
   at SimpleInjector.Container.GetInstanceForRootType[TService]()
   at SimpleInjector.Container.GetInstance[TService]()
   at SimpleInjector.Container.GetAllInstances[TService]()
   at Rebus.SimpleInjector.SimpleInjectorContainerAdapter.<GetHandlers>d__3`1.MoveNext()

Upvotes: 3

Views: 2269

Answers (3)

ilcorvo
ilcorvo

Reputation: 486

You cited my SO Answer (question), so I’ll share with you how I implemented it. As you will see, using specific interfaces, I separated commands from events.

Then, just in the consuming part of the queue, I did this kind of registrations:

    public static void Register()
    {
        var assemblies = AppDomain.CurrentDomain.GetAssemblies()
            .Where(i => i.FullName.StartsWith("MySolutionPrefix"))
            ;

        var container = new Container();

        // http://simpleinjector.readthedocs.io/en/latest/lifetimes.html#perexecutioncontextscope
        container.Options.DefaultScopedLifestyle = new ExecutionContextScopeLifestyle();


        var serviceType = typeof(IHandleMessages<>).Name;
        // this is extension method to get specific handlers
        IEnumerable<Type> commandHandlers = assemblies.GetHandlers(serviceType, typeof(ICommand));
        container.Register(typeof(IHandleMessages<>), commandHandlers.Concat(new List<Type> { typeof(HistorizeCommandHanlder) }));

        // NOTE Just command Handlers
        container.RegisterCollection(typeof(IHandleMessages<>), commandHandlers);


        var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
            //.... logging, transport (I created my own transport on mongoDb), routing, sagas and so on
            .Options(o =>
            {
                // This simply my personal transport as Register<IOneWayClientTransport> 
                o.ConfigureDecouplingDatabase(db, settings.TopicBasedMessageQueueName);
                // this is more complicated because i want that automatically the message is copied on
                // a separate topic for each handler
                o.EnableHandlerDecoupling(settings.DecouplingHandlersRegistration);
            })
            .Start();

        container.Verify();

        // It is necessary because otherwise it sends published messages to no-one 
        commandHandlers.GetHandledSubTypes(serviceType, typeof(IVersionedEvent))
            .ToList()
            .ForEach(i => bus.Subscribe(i).Wait());

        // NOTE just events handlers
        IEnumerable<Type> eventHandlers = assemblies
            .GetHandlers(serviceType, typeof(IVersionedEvent))
            .Except(commandHandlers)
            //.Except(new List<Type> { typeof(TempHandlerLogDecorator) })
            ;
        foreach (var handler in eventHandlers)
            ConfigureServiceBus(mongoDbConnectionProvider, db, handler.FullName, new[] { handler });
    }

    private static IBus ConfigureServiceBus(MongoDbConnectionProvider mongoDbConnectionProvider, IMongoDatabase db, string inputQueueName, IEnumerable<Type> handlers)
    {
        var container = new Container();

        container.Options.DefaultScopedLifestyle = new ExecutionContextScopeLifestyle();

        container.RegisterCollection(typeof(IHandleMessages<>), handlers);

        var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
            .... logging, Subscriptions
            // this is a consumer only for inputQueueName
            .Transport(t => t.UseMongoDb(db, settings.TopicBasedMessageQueueName, inputQueueName))
            .Options(o =>
            {
                o.ConfigureDecouplingDatabase(db, settings.TopicBasedMessageQueueName);
                o.EnableHandlerDecoupling(settings.DecouplingHandlersRegistration);
            })
            .Start();

        container.Verify();

        handlers.GetHandledSubTypes(typeof(IHandleMessages<>).Name, typeof(IVersionedEvent))
            .ToList()
            .ForEach(i => bus.Advanced.Topics.Subscribe(i.GetDecoupledTopic(settings.DecouplingHandlersRegistration)).Wait());

        return bus;
    }

I'm still using Rebus 3.0.1 and SimpleInjector 3.2.3.

Upvotes: 2

mookid8000
mookid8000

Reputation: 18628

I usually recommend keeping only one single IBus per container instance, because the bus can considered "an application" in itself, which happens to fit nicely with the fact that an IoC container is an object that can "host" an application for the duration of its lifetime.

Rebus does not provide a Conforming Container abstraction, because I agree with Mark Seemann that that is a project that is doomed to fail. In fact, as the wiki page mentions, Rebus used to provide automatic registration of handlers, but that turned out to be problematic.

Instead, Rebus encourages you to provide a "container adapter" (implementation of IContainerAdapter) whose responsibilities are:

  • look up handlers
  • provide a way to register IBus and IMessageContext in The Correct Way

where container adapters are provided out of the box for Autofac, Castle Windsor, SimpleInjector, etc. However, providing a container adapter is not required – the Configure.With(...) rant is happy with receiving only a "handler activator" (implementation of IHandlerActivator), so if you only want to use your IoC container to look up handlers and take care of registering IBus yourself, you can do that too by implementing IHandlerActivator and looking up handlers in your container.

TL;DR: The Rebus Way is to treat an instance of your IoC container as a separate application, and therefore it makes sense to register only one IBus in it.

It is perfectly fine to new up multiple container instances of you want to host multiple applications (or even multiple instances of your application with different message SLAs) in a single process.

Upvotes: 5

Steven
Steven

Reputation: 172696

The exception states: "Type IBus has already been registered". According to your stack trace, the second time IBus is being added is inside the SimpleInjectorContainerAdapter. You will have to find out when it was registered for the first time. This is easy to do; just registered a dummy IBus as very first registration after creating the Container and take a look at the stack trace where it blows up.

Upvotes: 2

Related Questions