Reputation: 522
New to MassTransit and still playing around with some of the tutorial projects. I'm going to have a service that will run for maybe 20 minutes and I need to do something when it's done. Because it can take so long I don't want to follow the request/response pattern and await the response, holding up the thread. I think my other option is the create another queue just for the consumer to publish to when the job is done. I've looked at this post: MassTransit3 how to make request from consumer, but I'm not sure how to implement this. My projects, again from this tutorial, looks like this:
Publisher:
static void Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(x =>
x.Host(new Uri("rabbitmq://localhost/"), h => {}));
var busHandle = bus.Start();
var text = ""'
Console.WriteLine("Publisher");
while(text != "quit")
{
Console.Write("Enter a message: ");
text = Console.ReadLine();
var message = new SomethingHappenedMessage()
{
What = text,
When = DateTime.Now
}
bus.Publish(message);
}
busHandle.Stop();
}
Subscriber:
static void Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost"), h => {});
x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e =>
e.Consumer<SomethingHappenedConsumer>());
});
Console.WriteLine("Subscriber");
var busHandle = bus.Start();
Console.ReadKey();
busHandle.Stop();
}
Consumer:
class SomethingHappenedConsumer : IConsumer<ISomethingHappened>
{
public Task Consume(ConsumeContext<ISomethingHappened> context)
{
Console.Write("TXT: " + context.Message.What);
Console.Write(" SENT: " + context.Message.When);
Console.Write(" PROCESSED: " + DateTime.Now);
Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")");
return Task.FromResult(0);
}
}
How would I go about creating a callback queue in the consumer?
Upvotes: 1
Views: 2261
Reputation: 522
Thanks again to @Travis for the help. Just wanted to show the final code I ended up with for anyone in the future. The messaging looks funny for the response but it is correctly posting back to the publisher.
Publisher:
static void Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost/"), h => { });
x.ReceiveEndpoint(host, "MtPubSubExample_TestPublisher", e =>
e.Consumer<ResponseConsumer>());
});
var busHandle = bus.Start();
var text = "";
Console.WriteLine("Publisher");
while(text != "quit")
{
Console.Write("Enter a message: ");
text = Console.ReadLine();
var message = new SomethingHappenedMessage()
{
What = text,
When = DateTime.Now
};
bus.Publish(message);
}
busHandle.Stop();
}
Response Consumer:
class ResponseConsumer : IConsumer<IResponse>
{
public Task Consume(ConsumeContext<IResponse> context)
{
Console.WriteLine("RESPONSE MESSAGE: " + context.Message.Message);
return Task.FromResult(0);
}
}
Subscriber:
static void Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost/"), h => { });
x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e =>
e.Consumer<SomethingHappenedConsumer>());
});
Console.WriteLine("Subscriber");
var busHandle = bus.Start();
Console.ReadKey();
busHandle.Stop();
}
Subscriber Consumer:
class SomethingHappenedConsumer : IConsumer<ISomethingHappened>
{
private IBusControl bus = Bus.Factory.CreateUsingRabbitMq(x =>
x.Host(new Uri("rabbitmq://localhost/"), h => { }));
public Task Consume(ConsumeContext<ISomethingHappened> context)
{
var now = DateTime.Now;
Console.Write("TXT: " + context.Message.What);
Console.Write(" SENT: " + context.Message.When);
Console.Write(" PROCESSED: " + now);
Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")");
var response = new ResponseMessage()
{
Message = "The request was processed at " + now
};
bus.Publish(response);
return Task.FromResult(0);
}
}
Upvotes: 1
Reputation: 10567
In your consumer, just Bus.Publish(new ResponseMessage());
(or whatever you call your response) and have your publisher register a consumer for that message type. You publisher doesn't appear to have been bound to a queue, just so make up a queue name and bind it to a queue as well.
Upvotes: 1