Reputation: 891
Hi I want to have a rebus with two queue,this is how i config it right now
container.Register<BuiltinHandlerActivator>(() =>
{
var activator = new BuiltinHandlerActivator();
var rebusConnection = configuration["Rebus:ConnectionString"];
activator.Register(() => ActivatorUtilities.CreateInstance<CampaignsHandler>(container));
activator.Register(() => ActivatorUtilities.CreateInstance<MessageHandler>(container));
activator.Register(() => ActivatorUtilities.CreateInstance<DeliveryStatusHandler>(container));
Log.Logger = container.GetInstance<ILogger>();
Configure.With(activator)
.Transport(t => t.UseSqlServer(rebusConnection, "RebusQueue", "BackgroundJobs"))
.Logging(l => l.Serilog(Log.Logger))
.Routing(r =>
{
var typeBasedRouting = r.TypeBased();
typeBasedRouting.MapAssemblyOf<MessageSent>("BackgroundJobs");
})
.Options(o => o.SetNumberOfWorkers(6))
.Options(o => o.SetMaxParallelism(6))
.Options(b => b.SimpleRetryStrategy(maxDeliveryAttempts: 1))
.Start();
activator.Bus.Subscribe<MessageSent>().Wait();
return activator;
}, Lifestyle.Singleton);
my MessageHandler processed sendMessage objects and turn them to messageSent obj with status infomation and my DeliveryStatusHandler processed messageSent obj to update my database. this problem is I only have one queue(one database table "RebusQueue").So only after all the sendMessage objects are done now the database start to update.
I want to process my messageSent obj right after my sendMessage objects got processed.so I assume I should have two queues(two table)? but how to config the rebus?
I looked at the question Multiple input queues in one rebus process are we facing the same problems?
this is what i do in Messagehandler.cs
class MessageHandler : IHandleMessages<SendMessage>
{
private readonly IBus _bus;
private MessagingRuntime _messagingRuntime;
private IRepository _repository;
private ISnapshotRepository _snapshotRepository;
private readonly ITemplateEngineProvider _templateEngineProvider;
private IUrlShortener _urlShortener;
private ILogger _logger;
public MessageHandler(MessagingRuntime messagingRuntime, IRepository repository, ISnapshotRepository snapshotRepository, ITemplateEngineProvider templateEngineProvider, IUrlShortener urlShortener, IBus bus, ILogger logger)
{
_messagingRuntime = messagingRuntime;
_repository = repository;
_snapshotRepository = snapshotRepository;
_templateEngineProvider = templateEngineProvider;
_urlShortener = urlShortener;
_bus = bus;
_logger = logger;
}
public async Task Handle(SendMessage message)
{
var template = _snapshotRepository.Query<Template>(message.DateCreated).Where(x => x.Id == message.TemplateId).FirstOrDefault();
var subscriber = _snapshotRepository.Query<Subscriber>(message.DateCreated).Where(x => x.Id == message.SubscriberId).FirstOrDefault();
var templateEngine = GetTemplateEngine(message.CampaignId, message.Tags);
var @event = new MessageSent
{
Id = SequentialGuid.Instance.NewGuid(),
DeliveryStatusId = message.DeliveryStatusId,
SubscriberId = subscriber.Id,
DateCreated = message.DateCreated
};
try
{
_messagingRuntime.ProcessSendRequest(new[] { subscriber }, templateEngine, template);
@event.IsDeliverySuccessful = true;
}
catch (MessagingException ex)
{
_logger.Error(ex.ToString());
@event.IsDeliverySuccessful = false;
}
await _bus.Publish(@event);
}
}
this is the second handler which will (update the database)
class DeliveryStatusHandler : IHandleMessages<MessageSent>
{
private ILogger _logger;
private IRepository _repository;
private IRepository2 _repository2;
private ISnapshotRepository _snapshotRepository;
public DeliveryStatusHandler(IRepository repository, IRepository2 repository2,ISnapshotRepository snapshotRepository, ILogger logger)
{
_repository = repository;
_repository2 = repository2;
_snapshotRepository = snapshotRepository;
_logger = logger;
}
public Task Handle(MessageSent @event)
{
var deliveryStatus = _repository2.Find<DeliveryStatus>(@event.DeliveryStatusId);
if (deliveryStatus == null)
{
_logger.Error("Delivery Status does not exist");
return Task.FromResult<object>(null);
}
var deliveryStatusItem = _repository2.Find<DeliveryStatusItem>(@event.Id);
var subscriber = _snapshotRepository.Query<Subscriber>(@event.DateCreated).Where(x => x.Id == @event.SubscriberId).FirstOrDefault();
if (deliveryStatusItem == null)
{
deliveryStatusItem = new DeliveryStatusItem();
deliveryStatusItem.Id = @event.Id;
deliveryStatusItem.Email = subscriber.Email;
deliveryStatusItem.PhoneNumber = subscriber.PhoneNumber;
deliveryStatusItem.Name = subscriber.Name;
}
deliveryStatusItem.DeliveryStatusId = @event.DeliveryStatusId;
deliveryStatusItem.IsDeliverySuccessful = @event.IsDeliverySuccessful;
_repository2.Save<DeliveryStatusItem>(deliveryStatusItem);
return Task.FromResult<object>(null);
}
}
Upvotes: 2
Views: 1127
Reputation: 18628
If I were you, I would start two Rebus instances, each having its own input queue (it's totally fine to do it in the same process if you don't need to be able to update them independently of each other).
The two instances would be:
MessageSent
and handle it accordinglyThis way you can easily configure the number of threads and the parallelism settings for each instance, allowing you to throttle/scale the processing of the messages.
Upvotes: 2