Fallon
Fallon

Reputation: 45

How to implement a simple reply in rebus

public static void SendREsbDx(Job job)
{
    using (var adapter = new BuiltinContainerAdapter())
    {
        adapter.Handle<ReplyMsg>(msg =>
        {
            string mss = msg.message;
        });

        Configure.With(adapter)
            .Logging(l => l.ColoredConsole(LogLevel.Warn))
            .MessageOwnership(o => o.FromRebusConfigurationSection())
            .Transport(t => t.UseSqlServer("server=.;initial catalog=rebus_test;integrated security=true","consumerx","error")
                             .EnsureTableIsCreated())
            .CreateBus()
            .Start();
        adapter.Bus.Send<Job>(job);
    }
}

I am using the above code to send a message to a consumer. The consumer will use the bus.Reply, but the above code obviously does not work.

I simply want to be able to receive a reply from the consumer. How would this be accomplished?

Upvotes: 3

Views: 2915

Answers (1)

mookid8000
mookid8000

Reputation: 18628

Sounds like your consumer does not have a handler for Job messages.

In your case, it sounds like you'll need two bus instances - a consumer instance that has an implementation of IHandleMessages<Job> which will bus.Reply(new ReplyMsg {...}), and a producer instance that has an implementation of IHandleMessages<ReplyMsg> which will bus.Send(new Job{...}) and do whatever needs to be done in the reply handler.

If you're interested in looking at some sample code that demonstrates request/reply, take a look at the integration sample in the Rebus samples repository which has some simple request/reply going on between the Client (which would correspond to the producer in your case) and the IntegrationService (which corresponds to the consumer).

The following code snippet demonstrates how it can be done:

var producer = new BuiltinContainerAdapter();
var consumer = new BuiltinContainerAdapter();

consumer.Handle<Job>(job => {
    ...
    consumer.Bus.Reply(new ReplyMsg {...});
});

producer.Handle<ReplyMsg>(reply => {
    ....
});

Configure.With(producer)
     .Transport(t => t.UseSqlServer(connectionString, "producer.input", "error")
                      .EnsureTableIsCreated())
     .MessageOwnership(o => o.FromRebusConfigurationSection())
     .CreateBus()
     .Start();

Configure.With(consumer)
     .Transport(t => t.UseSqlServer(connectionString, "consumer.input", "error")
                      .EnsureTableIsCreated())
     .MessageOwnership(o => o.FromRebusConfigurationSection())
     .CreateBus()
     .Start();

// for the duration of the lifetime of your application
producer.Bus.Send(new Job {...});


// when your application shuts down:
consumer.Dispose();
producer.Dispose();

and in your app.config there must be an endpoint mapping that maps Job to consumer.input:

<rebus>
    <endpoints>
         <add messages="SomeNamespace.Job, SomeAssembly" endpoint="consumer.input"/>
    </endpoints>
</rebus>

I hope you can see now why your code does not work. Please let me know if I should elaborate further :)

I've added a request/reply sample to the Rebus samples repository to serve as proof that the code shown above can actually run (provided that you remove the .... etc of course - you need a basic understanding of C# to be able to use this code)

Upvotes: 3

Related Questions