Reputation: 111
Using mass transit. Everything is working fine and consuming/publishing to the correct topics / queues that were created beforehand.
But it's creating extra topics / subscriptions that it doesn't use. I've tried to follow and scrap together different available options but either MT doesn't publish to the correct topic or it has no effect.
Setup Code:
services.AddMassTransit((cfg) =>
{
cfg.AddConsumer<WorkOneConsumer, WorkOneConsumerDefinition>()
.Endpoint(e =>
{
e.Name = "queue:" + MassTransitDemoConstants.WORKFLOWONE_QUEUE;
e.Temporary = true;
e.ConfigureConsumeTopology = false;
});
cfg.AddConsumer<WorkTwoConsumer, WorkTwoConsumerDefinition>()
.Endpoint(e =>
{
e.Name = "queue:" + MassTransitDemoConstants.WORKFLOWTWO_QUEUE;
e.Temporary = true;
e.ConfigureConsumeTopology = false;
});
cfg.UsingAzureServiceBus((context, config) =>
{
config.Host(Configuration.GetValue<string>("AppSettings:ServiceBusConnectionString"));
//config.UseRateLimit();
//config.UseMessageRetry();
//config.UseCircuitBreaker();
config.Message<IWorkflowOneCompletedEvent>(config => config.SetEntityName(MassTransitDemoConstants.WORKFLOW_TOPIC));
config.Message<IWorkflowTwoCompletedEvent>(config => config.SetEntityName(MassTransitDemoConstants.WORKFLOW_TOPIC));
config.Message<IWorkflowFailedEvent>(config => config.SetEntityName(MassTransitDemoConstants.WORKFLOW_TOPIC));
config.ReceiveEndpoint(MassTransitDemoConstants.WORKFLOWONE_QUEUE, e =>
{
e.ConfigureConsumeTopology = false;
e.Consumer<WorkOneConsumer>(context);
});
config.ReceiveEndpoint(MassTransitDemoConstants.WORKFLOWTWO_QUEUE, e =>
{
e.ConfigureConsumeTopology = false;
e.Consumer<WorkTwoConsumer>(context);
});
config.Publish<IWorkflowOneCompletedEvent>(x => x.Exclude = true);
config.DeployPublishTopology = false;
config.ConfigureEndpoints(context);
});
});
This is one of the worker functions. It's trying to listen to the WorkOne / WorkTwo Commands (through queue which is working perfectly fine), and publishing events (through topics) to topic 'test' which is the value of 'MassTransitDemoConstants.WORKFLOW_TOPIC'.
It's not creating these topics on startup - it's creating these extra ones when I publish that event on my consumer:
await _publishEndpoint.Publish(new WorkflowOneCompletedEvent
{
CorrelationId = command.CorrelationId,
SubscriptionName = MassTransitDemoConstants.WORKFLOW_ONECOMPLETE_SUBSCRIPTION,
ConsumerDataName = "WorkerProject.Function"
}, context => { context.Headers.Set("subscriptionName", MassTransitDemoConstants.WORKFLOW_ONECOMPLETE_SUBSCRIPTION); });
This line above creates workerproject.function.models~workflowonecompletedevent
as a topic, and MassTransitDemoConstants.WORKFLOW_TOPIC (which is 'test')
as a subscription to that topic. It should be the other way around at the very least.
doesn't do anything to my current setup.
To reiterate, the programs are working - I am able to see my saga finish successfully. I just see these extra topics and subscriptions made.
The ideal scenario is for MT to not create any topics / subscriptions on its own. To that, I've tried setting ConfigureConsumeTopology = false
, DeployPublishTopology = false
, some attributes:
[ConfigureConsumeTopology...]
[ExcludeFromTopology]
[ExcludeFromImplementedTypes]
but to no avail.
What's wrong with my setup?
EDIT:
My consumer kicks off, does its thing, then publishes the IWorkflowOneCompletedEvent
to the right topic.
Without receiving that published message, the state of my service bus is:
(correct topic)
Topic: 'test'
Subscription: 'workflowOneQueue'
messages: 1
(created topic)
Topic: `workerproject.function.models~workflowonecompletedevent`
Subscription: 'test'
messages: 0
So it's publishing to the right place, but it's creating an extra erroneous topic.
Upvotes: 0
Views: 290
Reputation: 33457
e.Consumer<WorkOneConsumer>(context);
should be
e.ConfigureConsumer<WorkOneConsumer>(context);
This:
.Endpoint(e => { e.Name = "queue:" + MassTransitDemoConstants.WORKFLOWONE_QUEUE; e.Temporary = true; e.ConfigureConsumeTopology = false; })
is pointless, since you're manually configuring the receive endpoints. You can also remove:
config.ConfigureEndpoints(context);
Upvotes: 0