Reputation: 27360
Scenario:
I send request to server queue (I'm using RabbintMQ to asynchronously process message). When the server processes the request it produces two responses to different queues.
I would like to use RX to subscribe to to responses, but also have access to their corresponding requests.
What I have so far:
I use EventAggregator that uses reactive extension and exposes IObservable for event stream:
public interface IEventAggregator
{
void Publish<TEvent>(TEvent sampleEvent);
IObservable<TEvent> GetEvent<TEvent>();
}
When I send request I publish an event:
eventAggregator.Publish(new RequestSent { ID = Guid.NewGuid(), ... })
//I could later subscribe to the event like this:
//eventAggregator.GetEvent<RequestSent>().Subscribe(..)
When server responds, the responses are also published to the EventAggregator, so I can subscribe to them:
eventAggregator.GetEvent<ResponseFromQueue1>().Subscribe(OnResponseFromQueue1)
eventAggregator.GetEvent<ResponseFromQueue2>().Subscribe(OnResponseFromQueue2)
I could also subscribe to RequestSent
What I need:
private void OnResponseFromQueue1(RequestSent request, ResponseFromQueue1 response)
{
I need access to both request and respone
}
and this would be even better:
private void OnResponse(
RequestSent request,
ResponseFromQueue1 response1,
ResponseFromQueue2 response2)
{
//actually, this would simplify implementation of my logic a lot
}
Is is possible using RX?
Upvotes: 3
Views: 734
Reputation: 18663
You could use SelectMany
, assuming you can use something like the ID to associate the request with the responses
trigger.SelectMany(requestData => {
//We need to share this
var id = Guid.NewGuid();
//Publish your event
eventAggregator.Publish(new Request { ID = id, /*...*/ });
//Start listening for the two responses
//Grab only the first item matching the IDs
var left = eventAggregator.GetEvent<ResponseFromQueue1>().First(res => res.ID == id);
var right = eventAggregator.GetEvent<ResponseFromQueue2>().First(res => res.Id == id);
//We are done once both values have emitted.
return left.Zip(right);
}, (request, responses) => {
/*Here you will have access to the request and an array of the responses*/
});
One thing to bear in mind is that this code right now is assuming that Publish
will return before the responses come back. Since you said this is RabbitMQ that is probably a safe assumption, but something to bear in mind if you do any unit testing with this.
Edit
It seems in your scenario you actually would have:
//Set up the queue first
eventAggregator.GetEvent<RequestSent>()
.SelectMany(requestSent => {
var id = requestSent.ID;
var left = eventAggregator.GetEvent<ResponseFromQueue1>().First(res => res.ID == id);
var right = eventAggregator.GetEvent<ResponseFromQueue2>().First(res => res.ID == id);
return left.Zip(right);
}, (request, response) => {/**/});
//...Sometime later
eventAggregator.Publish(new Request{});
Upvotes: 3