J.C.A. Kokenberg
J.C.A. Kokenberg

Reputation: 23

Disconnect consumer while bus is connected

We have the following use case:

We have two busses (internal and external). The idea is that our own services use the internal bus and all third-party services use the external bus. We have a created a service that acts as a message router (aptly named MessageRouter).

When a message is published on the internal bus, the MessageRouter can pick up that message and place it on the external bus, and vice-versa. Through configuration we can tell which messages are allowed to pass from internal to external of from external to internal. This in itself works fine.

We have grouped our messages, we have Events, Commands and Requests. Each service has three ReceiveEnpoints, one for each message type. This means that all events are queued on the Events queue etc. Each message to which a service 'subscribes' gets it's own consumer.

var queues = new Dictionary<string, IEnumerable<Type>>
{
    // ReSharper disable once PossibleMultipleEnumeration
    { "Events", consumerTypes.Where(ct => ct.GetGenericArguments().Any(ga => typeof(IEvent).IsAssignableFrom(ga))) },
    // ReSharper disable once PossibleMultipleEnumeration
    { "Commands", consumerTypes.Where(ct => ct.GetGenericArguments().Any(ga => typeof(ICommand).IsAssignableFrom(ga))) },
    // ReSharper disable once PossibleMultipleEnumeration
    { "Queries", consumerTypes.Where(ct => ct.GetGenericArguments().Any(ga => typeof(IQuery).IsAssignableFrom(ga))) }
};

foreach (var queue in queues)
{
    config.ReceiveEndpoint(GetConsumerQueueName(queue.Key), cfg =>
    {
        foreach (var consumerType in queue.Value)
        {
            cfg.Consumer(consumerType, consumerContainer.Resolve);
        }
        configurator?.Invoke((T)cfg);
    });
}

Where config is a IBusFactoryConfigurator. This code called when the service starts.

What we would like to be able to do in our MessageRouter is to 'dynamically' add and, more importantly, remove consumers from the ReceiveEndpoint.

So far, we haven't had any luck. We have tried to add the Consumer through the use of the ConnectConsumer method on the BusControl instance. This gives us a ConnectHandle which has a Disconnect method. However, when using this approach, messages are not picked up by our consumers. Looking at the handle shows us that it is a MultipleConnectHandle, however, the handle has no 'internal' handles.

Is there any way to use the Consumer method to register the different consumers and to get their ConnectHandles, So that we can Disconnect the consumer if needed?

As stated, ideally we would like to be able to dynamically add and remove consumers to a ReceiveEndpoint.

Upvotes: 2

Views: 1359

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33457

You can't add/remove consumers on a receive endpoint while the bus is started, that isn't supported in any way, shape, or form.

You can, however, connect new receive endpoints on separate queues with one or more consumers. In your case above, it seems like you are not getting your consumers registered before connecting the receive endpoint, which is why you aren't seeing anything in the handle collection.

Upvotes: 2

Related Questions