Reputation: 33
Can I configure MassTransit with Amazon SQS to use multiple AWS regions at the same time? E.g. to set up a topic in region eu-west-1, but the queue which subscribes to this topic would be in us-east-1.
Additional information regarding the architecture of my application to help understand the motive: I have a distributed system containing hundreds of microservices across AWS regions (let's say eu-west-1 and us-east-1). I have a VPC and a Kubernetes cluster set up in both of these regions, and I'm looking to expand the application to other regions in the future. The app itself uses the CQRS pattern with event sourcing, where the "command" instances are located in the eu-west-1 region. Let's call this "central". The microservices need to communicate with events. These events are right now sent by RabbitMQ, also orchestrated by MassTransit - I'd like to exchange the transport from RabbitMQ to Amazon SQS/SNS, but keep the topic, queue and event orchestration in MassTransit. The idea behind the architecture of the application was to call "command" related endpoints, e.g. creating or updating an entity, on the "central" instance, which then publish a create/update event to the event bus, which is consumed in both the "central", and also all other regions (currently only us-east-1). I have several hundreds of message types, tens of k8s namespaces in these clusters. It is important, that in case of a new region is added, I can't take the existing ones down for maintenance, to add new configuration values to those services already running and using SQS to communicate.
How should I approach this issue?
I tried to set up the topics in "central", and bind the queues from the us-east-1 region manually, but had no luck to make it fully work with MassTransit - although I'm getting the messages in the consumers, but they're for some reason skipped (I see the "SKIP" event in the logs).
Publisher setup:
public class Program
{
public static async Task Main(string[] args)
{
await CreateHostBuilder(args).Build().RunAsync();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((_, services) =>
{
var ns = "namespace";
services.AddMassTransit(x =>
{
x.SetEndpointNameFormatter(new FifoEndpointNameFormatter(new KebabCaseEndpointNameFormatter(ns, false)));
x.UsingAmazonSqs((context, cfg) =>
{
cfg.MessageTopology.SetEntityNameFormatter(new FifoEntityNameFormatter());
cfg.PublishTopology.TopicAttributes[QueueAttributeName.ContentBasedDeduplication] = "true";
cfg.Host("eu-west-1", configurator =>
{
configurator.AccessKey("*****");
configurator.SecretKey("*****");
configurator.Scope(ns, true);
});
cfg.Publish<Bye>();
cfg.Publish<Hello>();
cfg.UsePublishFilter(typeof(PublishFilter<>), context);
});
});
services.AddHostedService<Worker>();
});
}
Consumer setup:
public class Program
{
public static async Task Main(string[] args)
{
await CreateHostBuilder(args).Build().RunAsync();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((_, services) =>
{
var ns = "namespace";
services.AddMassTransit(x =>
{
x.SetEndpointNameFormatter(
new FifoEndpointNameFormatter(new KebabCaseEndpointNameFormatter(ns, false)));
x.AddConsumer<HelloConsumer>();
x.AddConsumer<ByeConsumer>();
var qnf = new QueueNameFormatter(new FifoEntityNameFormatter());
qnf.Namespace = ns;
x.UsingAmazonSqs((context, cfg) =>
{
cfg.UseConsumeFilter(typeof(ConsumeFilter<>), context);
cfg.MessageTopology.SetEntityNameFormatter(new FifoEntityNameFormatter());
cfg.Host("us-east-1", configurator =>
{
configurator.AccessKey("*****");
configurator.SecretKey("*****");
configurator.Scope(ns, true);
});
cfg.ReceiveEndpoint("namespace_Hello.fifo", configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.ConfigureConsumer<HelloConsumer>(context);
configurator.QueueAttributes[QueueAttributeName.FifoQueue] = true;
configurator.QueueAttributes[QueueAttributeName.ContentBasedDeduplication] = true;
configurator.DiscardSkippedMessages();
configurator.DiscardFaultedMessages();
configurator.PublishFaults = false;
});
cfg.ReceiveEndpoint("namespace_Bye.fifo", configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.ConfigureConsumer<ByeConsumer>(context);
configurator.QueueAttributes[QueueAttributeName.FifoQueue] = true;
configurator.QueueAttributes[QueueAttributeName.ContentBasedDeduplication] = true;
configurator.DiscardSkippedMessages();
configurator.DiscardFaultedMessages();
configurator.PublishFaults = false;
});
});
});
});
}
Consumer logs (redacted namespaces):
Thanks in advance!
Upvotes: 0
Views: 557
Reputation: 33298
You cannot, as there is no way to specify out-of-region addresses within a given bus. If you created the SNS subscriptions yourself, you could make it so that published messages would go to the other region, but that isn't something MassTransit is going to do for you.
Upvotes: 1