Jorik
Jorik

Reputation: 239

Send message to specific channel/routing key with Masstransit/RabbitMQ in C#

I've been working on an application that starts some worker roles based on messaging.

This is the way I want the application to work:

Client sends a request for work (RPC).

One of the worker roles accepts the work, generates a random id, and responds to the RPC with the new id.
The worker will post its debug logs on a log channel with the id.
The client will subscribe to this channel so users can see what's going on.

The RPC is working fine, but I can't seem to figure out how to implement the log-sending.

This is the code that accepts work (simplified)

var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
    var host = sbc.Host(new Uri("rabbitmq://xxxxxx.nl"), h =>
    {
        h.Username("xxx");
        h.Password("xxxx");
    });


    sbc.ReceiveEndpoint(host, "post_work_item", e =>
    {
        e.Consumer<CreateWorkItemCommand>();
    });

    sbc.ReceiveEndpoint(host, "list_work_items", e =>
    {
        e.Consumer<ListWorkItemsCommand>();
    });
});

The CreateWorkItemCommand will create the thread, do the work, etc. Now, how would I implement the log-sending with Masstransit? I was thinking something like:

bus.Publish( 
     obj: WorkUpdate{ Message = "Hello world!" }, 
     channel: $"work/{work_id}"
)

And the client will do something this:

bus.ReceiveFromEvented($"work/{rpc.work_id}").OnMessage += { more_psuedo_code() }

I can't seem to find out how to do this.

Can anyone help me out?

Thanks!

Upvotes: 1

Views: 1415

Answers (1)

Alexey Zimarev
Alexey Zimarev

Reputation: 19630

It looks both like a saga and turnout. Current Turnout implementation is monitoring the job itself and I doubt you can really subscribe to that message flow. And it is still not really done.

You might solve this using the saga. Some external trigger (a command) will start the first saga, which will use Request/Response to start the process, which will do the work, and get its correlation id (job id). The long job can publish progress reports using the same correlation id and the saga will consume them, doing what it needs to do.

The "work/{rpc.work_id}" will be then replaced by the correlation.

Upvotes: 1

Related Questions