Bob Zhang
Bob Zhang

Reputation: 891

How can I have a rebus with two queue?

Hi I want to have a rebus with two queue,this is how i config it right now

container.Register<BuiltinHandlerActivator>(() =>
        {
            var activator = new BuiltinHandlerActivator();               
            var rebusConnection = configuration["Rebus:ConnectionString"];
            activator.Register(() => ActivatorUtilities.CreateInstance<CampaignsHandler>(container));
            activator.Register(() => ActivatorUtilities.CreateInstance<MessageHandler>(container));
            activator.Register(() => ActivatorUtilities.CreateInstance<DeliveryStatusHandler>(container));
            Log.Logger = container.GetInstance<ILogger>();
            Configure.With(activator)
                .Transport(t => t.UseSqlServer(rebusConnection, "RebusQueue", "BackgroundJobs"))
                .Logging(l => l.Serilog(Log.Logger))
                .Routing(r =>
                {
                    var typeBasedRouting = r.TypeBased();
                    typeBasedRouting.MapAssemblyOf<MessageSent>("BackgroundJobs");                     

                })
                .Options(o => o.SetNumberOfWorkers(6))
                .Options(o => o.SetMaxParallelism(6))
                .Options(b => b.SimpleRetryStrategy(maxDeliveryAttempts: 1))
                .Start();               
            activator.Bus.Subscribe<MessageSent>().Wait();

            return activator;
        }, Lifestyle.Singleton);

my MessageHandler processed sendMessage objects and turn them to messageSent obj with status infomation and my DeliveryStatusHandler processed messageSent obj to update my database. this problem is I only have one queue(one database table "RebusQueue").So only after all the sendMessage objects are done now the database start to update.

I want to process my messageSent obj right after my sendMessage objects got processed.so I assume I should have two queues(two table)? but how to config the rebus?

I looked at the question Multiple input queues in one rebus process are we facing the same problems?

this is what i do in Messagehandler.cs

class MessageHandler : IHandleMessages<SendMessage>
{
    private readonly IBus _bus;
    private MessagingRuntime _messagingRuntime;
    private IRepository _repository;
    private ISnapshotRepository _snapshotRepository;
    private readonly ITemplateEngineProvider _templateEngineProvider;
    private IUrlShortener _urlShortener;
    private ILogger _logger;

    public MessageHandler(MessagingRuntime messagingRuntime, IRepository repository, ISnapshotRepository snapshotRepository, ITemplateEngineProvider templateEngineProvider, IUrlShortener urlShortener, IBus bus, ILogger logger)
    {
        _messagingRuntime = messagingRuntime;
        _repository = repository;
        _snapshotRepository = snapshotRepository;
        _templateEngineProvider = templateEngineProvider;
        _urlShortener = urlShortener;
        _bus = bus;
        _logger = logger;
    }

    public async Task Handle(SendMessage message)
    {
        var template = _snapshotRepository.Query<Template>(message.DateCreated).Where(x => x.Id == message.TemplateId).FirstOrDefault();
        var subscriber = _snapshotRepository.Query<Subscriber>(message.DateCreated).Where(x => x.Id == message.SubscriberId).FirstOrDefault();
        var templateEngine = GetTemplateEngine(message.CampaignId, message.Tags);


        var @event = new MessageSent
        {
            Id = SequentialGuid.Instance.NewGuid(),
            DeliveryStatusId = message.DeliveryStatusId,
            SubscriberId = subscriber.Id,
            DateCreated = message.DateCreated                
        };

        try
        {
            _messagingRuntime.ProcessSendRequest(new[] { subscriber }, templateEngine, template);
            @event.IsDeliverySuccessful = true;
        }
        catch (MessagingException ex)
        {
            _logger.Error(ex.ToString());
            @event.IsDeliverySuccessful = false;
        }

        await _bus.Publish(@event);
    }
}

this is the second handler which will (update the database)

class DeliveryStatusHandler : IHandleMessages<MessageSent>
{
    private ILogger _logger;
    private IRepository _repository;
    private IRepository2 _repository2;
    private ISnapshotRepository _snapshotRepository;

    public DeliveryStatusHandler(IRepository repository, IRepository2 repository2,ISnapshotRepository snapshotRepository, ILogger logger)
    {
        _repository = repository;
        _repository2 = repository2;
        _snapshotRepository = snapshotRepository;
        _logger = logger;
    }

    public Task Handle(MessageSent @event)
    {
        var deliveryStatus = _repository2.Find<DeliveryStatus>(@event.DeliveryStatusId);
        if (deliveryStatus == null)
        {
            _logger.Error("Delivery Status does not exist");
            return Task.FromResult<object>(null);
        }

        var deliveryStatusItem = _repository2.Find<DeliveryStatusItem>(@event.Id);
        var subscriber = _snapshotRepository.Query<Subscriber>(@event.DateCreated).Where(x => x.Id == @event.SubscriberId).FirstOrDefault();
        if (deliveryStatusItem == null)
        {
            deliveryStatusItem = new DeliveryStatusItem();
            deliveryStatusItem.Id = @event.Id;
            deliveryStatusItem.Email = subscriber.Email;
            deliveryStatusItem.PhoneNumber = subscriber.PhoneNumber;
            deliveryStatusItem.Name = subscriber.Name;

        }
        deliveryStatusItem.DeliveryStatusId = @event.DeliveryStatusId;
        deliveryStatusItem.IsDeliverySuccessful = @event.IsDeliverySuccessful;
        _repository2.Save<DeliveryStatusItem>(deliveryStatusItem);
        return Task.FromResult<object>(null);
    }
}

Upvotes: 2

Views: 1127

Answers (1)

mookid8000
mookid8000

Reputation: 18628

If I were you, I would start two Rebus instances, each having its own input queue (it's totally fine to do it in the same process if you don't need to be able to update them independently of each other).

The two instances would be:

  1. The first one with the handler you showed me
  2. The second one that will subscribe to MessageSent and handle it accordingly

This way you can easily configure the number of threads and the parallelism settings for each instance, allowing you to throttle/scale the processing of the messages.

Upvotes: 2

Related Questions