Reputation: 801
I'm trying to combine an IObservable that contains request information with an IObservable that contains response information. Both the request and response information contain an Id and a timestamp in the form of a DateTime. The Id on the Request will match the Id on the Response.
I need to combine the two streams based on Id and calculate the time elapsed between the Request's timestamp and the Response's timestamp. What is the best way to accomplish this:
Upvotes: 1
Views: 195
Reputation: 2652
You could join by coincidence.
from request in requests // 1
join response in responses // 2
on responses.Where(response => response.Id == request.Id) // 1,3
equals Observable.Empty<Unit>() // 2
where request.Id == response.Id // 4
select response.Time - request.Time // 5
Think of it like this:
Update:
Request windows are opened by from
and closed by on
.
Response windows are opened by join
and closed by equals
. The response duration specified by equals
is Empty
, thus responses aren't actually windows - they are treated as point events and simply projected once into whatever request windows are currently open.
#3 requires a Where
operator because it defines the duration of each request; i.e., it indicates when a request window closes, which is when a response with a matching ID arrives.
#4 is required because of #3. Think about it - we're cross-joining every response with every request window that's currently open. Request windows only close when a matching response arrives. That means every response that doesn't match is paired with every request, but you only care about the last response - the one that matches.
Upvotes: 3
Reputation: 747
If you don't have concurrent requests, Zip is probably the operator you are looking for:
Zip will combine two observables in a single sequence that produces elements at a rate that is defined by the slowest observable being combined.
In case you have concurrent requests, you should probably try a different approach that does not require you to combine the two observables. You could for instance store the request ID and DateTime in a ConcurrentDictionary and subscribe to the responses observable where you lookup for the corresponding request from the ConcurrentDictionary (don't forget to remove the entry after reading it).
Upvotes: 0