sevzas
sevzas

Reputation: 801

Reactive Extensions combining two IObservables into one

I'm trying to combine an IObservable that contains request information with an IObservable that contains response information. Both the request and response information contain an Id and a timestamp in the form of a DateTime. The Id on the Request will match the Id on the Response.

I need to combine the two streams based on Id and calculate the time elapsed between the Request's timestamp and the Response's timestamp. What is the best way to accomplish this:

Upvotes: 1

Views: 195

Answers (2)

Dave Sexton
Dave Sexton

Reputation: 2652

You could join by coincidence.

from request in requests             // 1
join response in responses           // 2
on responses.Where(response => response.Id == request.Id) // 1,3
equals Observable.Empty<Unit>()      // 2
where request.Id == response.Id      // 4
select response.Time - request.Time  // 5

Think of it like this:

  1. Create a window for each request.
  2. Project every response into every request window.
  3. When a response matches a request, close that request window.
  4. For each request, ignore all responses that do not match.
  5. Project the time difference for the response that does match.

Update:

Request windows are opened by from and closed by on.

Response windows are opened by join and closed by equals. The response duration specified by equals is Empty, thus responses aren't actually windows - they are treated as point events and simply projected once into whatever request windows are currently open.

#3 requires a Where operator because it defines the duration of each request; i.e., it indicates when a request window closes, which is when a response with a matching ID arrives.

#4 is required because of #3. Think about it - we're cross-joining every response with every request window that's currently open. Request windows only close when a matching response arrives. That means every response that doesn't match is paired with every request, but you only care about the last response - the one that matches.

Upvotes: 3

Tiago
Tiago

Reputation: 747

If you don't have concurrent requests, Zip is probably the operator you are looking for:

http://rxmarbles.com/#zip

Zip will combine two observables in a single sequence that produces elements at a rate that is defined by the slowest observable being combined.

In case you have concurrent requests, you should probably try a different approach that does not require you to combine the two observables. You could for instance store the request ID and DateTime in a ConcurrentDictionary and subscribe to the responses observable where you lookup for the corresponding request from the ConcurrentDictionary (don't forget to remove the entry after reading it).

Upvotes: 0

Related Questions