adam
adam

Reputation: 522

MassTransit: Creating a callback queue in consumer

New to MassTransit and still playing around with some of the tutorial projects. I'm going to have a service that will run for maybe 20 minutes and I need to do something when it's done. Because it can take so long I don't want to follow the request/response pattern and await the response, holding up the thread. I think my other option is the create another queue just for the consumer to publish to when the job is done. I've looked at this post: MassTransit3 how to make request from consumer, but I'm not sure how to implement this. My projects, again from this tutorial, looks like this:

Publisher:

static void Main(string[] args)
{
    var bus = Bus.Factory.CreateUsingRabbitMq(x =>
        x.Host(new Uri("rabbitmq://localhost/"), h => {}));
    var busHandle = bus.Start();
    var text = ""'
    Console.WriteLine("Publisher");
    while(text != "quit")
    {
        Console.Write("Enter a message: ");
        text = Console.ReadLine();

        var message = new SomethingHappenedMessage()
        {
            What = text,
            When = DateTime.Now
        }
        bus.Publish(message);
    }
    busHandle.Stop();
}

Subscriber:

static void Main(string[] args)
{
    var bus = Bus.Factory.CreateUsingRabbitMq(x => 
    {
        var host = x.Host(new Uri("rabbitmq://localhost"), h => {});
        x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e =>
            e.Consumer<SomethingHappenedConsumer>());
    });
    Console.WriteLine("Subscriber");
    var busHandle = bus.Start();
    Console.ReadKey();
    busHandle.Stop();
}

Consumer:

class SomethingHappenedConsumer : IConsumer<ISomethingHappened>
{
    public Task Consume(ConsumeContext<ISomethingHappened> context)
    {
        Console.Write("TXT: " + context.Message.What);
        Console.Write("  SENT: " + context.Message.When);
        Console.Write("  PROCESSED: " + DateTime.Now);
        Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")");
        return Task.FromResult(0);
    }
}

How would I go about creating a callback queue in the consumer?

Upvotes: 1

Views: 2261

Answers (2)

adam
adam

Reputation: 522

Thanks again to @Travis for the help. Just wanted to show the final code I ended up with for anyone in the future. The messaging looks funny for the response but it is correctly posting back to the publisher.

Publisher:

static void Main(string[] args)
{
    var bus = Bus.Factory.CreateUsingRabbitMq(x =>
    {
        var host = x.Host(new Uri("rabbitmq://localhost/"), h => { });
        x.ReceiveEndpoint(host, "MtPubSubExample_TestPublisher", e =>
            e.Consumer<ResponseConsumer>());
    });
    var busHandle = bus.Start();
    var text = "";
    Console.WriteLine("Publisher");
    while(text != "quit")
    {
        Console.Write("Enter a message: ");
        text = Console.ReadLine();

        var message = new SomethingHappenedMessage()
        {
            What = text,
            When = DateTime.Now
        };
        bus.Publish(message);
    }

    busHandle.Stop();
}

Response Consumer:

class ResponseConsumer : IConsumer<IResponse>
{
    public Task Consume(ConsumeContext<IResponse> context)
    {
        Console.WriteLine("RESPONSE MESSAGE: " + context.Message.Message);
        return Task.FromResult(0);
    }
}

Subscriber:

static void Main(string[] args)
{
    var bus = Bus.Factory.CreateUsingRabbitMq(x =>
    {
        var host = x.Host(new Uri("rabbitmq://localhost/"), h => { });
        x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e =>
            e.Consumer<SomethingHappenedConsumer>());
    });
    Console.WriteLine("Subscriber");
    var busHandle = bus.Start();
    Console.ReadKey();
    busHandle.Stop();
}

Subscriber Consumer:

class SomethingHappenedConsumer : IConsumer<ISomethingHappened>
{
    private IBusControl bus = Bus.Factory.CreateUsingRabbitMq(x =>
        x.Host(new Uri("rabbitmq://localhost/"), h => { }));

    public Task Consume(ConsumeContext<ISomethingHappened> context)
    {
        var now = DateTime.Now;
        Console.Write("TXT: " + context.Message.What);
        Console.Write("  SENT: " + context.Message.When);
        Console.Write("  PROCESSED: " + now);
        Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")");

        var response = new ResponseMessage()
        {
            Message = "The request was processed at " + now
        };

        bus.Publish(response);
        return Task.FromResult(0);
    }
}

Upvotes: 1

Travis
Travis

Reputation: 10567

In your consumer, just Bus.Publish(new ResponseMessage()); (or whatever you call your response) and have your publisher register a consumer for that message type. You publisher doesn't appear to have been bound to a queue, just so make up a queue name and bind it to a queue as well.

Upvotes: 1

Related Questions