Reputation: 12695
I have an Observable generated from a regular .NET event. The Observable is hot - not warm - in the sense that it starts producing values even before any subscription, and everytime someone subscribes it will receive the latest produced value. Let's name this eventStream
.
Then I have another Observable, exposed by another class, which represents some state flow, so every new value gives the current state of something managed by that class. This Observable is hot as well. Let's name it stateStream
.
Everytime the event sequence produces a new value, I want to pick (I'd say sample, but that might lead to confusion) the latest value provided by state sequence. This should produce a new sequence, combining the two values, then processing them, etc.
This is what I came up with, but it does not seem to work though:
var eventStream = Observable.FromEventPattern<MyEventArgs>(/*...*/);
var stateStream = someDependency.SomeStateStream;
eventStream.Select(eventValue =>
stateStream
.Take(1)
.Select(stateValue => new { Event = eventValue, State = stateValue }))
.Switch()
.Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
.Subscribe(value => /* do something */);
The rationale behind that is taken from other similar scenarios I worked with, where a new value produced by some source causes a new subscription to run, thus a new Observable gets returned, and finally the IObservable<IObservable<...>>
gets squashed into a one-dimensional IObservable again using Switch()
or some similar operator.
But in this case, from a quick test, there seems to be no new subscription, and only the very first stateStream
value gets produced. Instead I'd like to pick the first value (Take(1)
) everytime the eventStream
fires.
AFAIK, CombineLatest
and Zip
cannot fit the bill: CombineLatest
fires everytime one of the two sequences provides a new value; Zip
fires everytime both sequences have a new value available, and tipically this means when the slowest of the two has values. And/Then/When
should not be right as well for same reason as Zip
.
I've also checked SO thread combining one observable with latest from another observable, but I don't think that can apply here. Only in one of the comments I read
[...] and then Scan acts like a CombineLatest that filters for notifications from only one side
and somehow it sounded familiar, but I could not wrap my head around that.
Upvotes: 7
Views: 1976
Reputation: 18663
The latest (beta) version of Rx.Net 2.3.0-beta2 has WithLatestFrom
:
//Only emits when eventStream emits
eventStream.WithLatestFrom(stateStream.StartWith(defaultState),
(evt, state) => new {Event = evt, State = state})
.Subscribe(/*Do something*/);
If not you can fill it in a pinch by using (note untested):
public static IObservable<TResult> WithLatestFrom<TLeft, TRight, TResult>(
this IObservable<TLeft> source,
IObservable<TRight> other,
Func<TLeft, TRight, TResult> resultSelector)
{
return source.Publish(os =>
other.Select(a => os
.Select(b => resultSelector(b,a)))
.Switch());
}
Source courtesy of @JamesWorld if I am not mistaken.
Upvotes: 0
Reputation: 117027
It seems to me that your current solution is actually pretty close, but it sounds like you just need to swap the eventStream
& stateStream
observables around and then remove the .Take(1)
.
Try this:
stateStream
.Select(stateValue =>
eventStream
.Select(eventValue => new { Event = eventValue, State = stateValue }))
.Switch()
.Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
.Subscribe(value => { /* do something */ });
Depending on how stateStream
is configured you may need to add a .StartWith(...)
to it to get the initial values from eventStream
, but I think this approach covers your requirements.
Upvotes: 1
Reputation: 15772
I think you want Observable.Sample()
stateSource.Sample(eventSource)
.Zip(eventSource,...)
Upvotes: 4
Reputation: 29776
You'll need a default value for the state stream, in case it hasn't ever emitted. Pass this default as the argument to MostRecent
(I just used null
here) and use the overload of Zip
that expects an IEnumerable
:
eventStream.Zip(
stateStream.MostRecent(null),
(evt,state) => new { Event = evt, State = state })
.Subscribe(/* etc */);
Upvotes: 0