superjos
superjos

Reputation: 12695

Pick Observable latest value when any value is produced by another Observable

I have an Observable generated from a regular .NET event. The Observable is hot - not warm - in the sense that it starts producing values even before any subscription, and everytime someone subscribes it will receive the latest produced value. Let's name this eventStream.

Then I have another Observable, exposed by another class, which represents some state flow, so every new value gives the current state of something managed by that class. This Observable is hot as well. Let's name it stateStream.

Everytime the event sequence produces a new value, I want to pick (I'd say sample, but that might lead to confusion) the latest value provided by state sequence. This should produce a new sequence, combining the two values, then processing them, etc.

This is what I came up with, but it does not seem to work though:

var eventStream = Observable.FromEventPattern<MyEventArgs>(/*...*/);
var stateStream = someDependency.SomeStateStream;

eventStream.Select(eventValue => 
  stateStream
    .Take(1)
    .Select(stateValue => new { Event = eventValue, State = stateValue }))
  .Switch()
  .Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
  .Subscribe(value => /* do something */);

The rationale behind that is taken from other similar scenarios I worked with, where a new value produced by some source causes a new subscription to run, thus a new Observable gets returned, and finally the IObservable<IObservable<...>> gets squashed into a one-dimensional IObservable again using Switch() or some similar operator.
But in this case, from a quick test, there seems to be no new subscription, and only the very first stateStream value gets produced. Instead I'd like to pick the first value (Take(1)) everytime the eventStream fires.

AFAIK, CombineLatest and Zip cannot fit the bill: CombineLatest fires everytime one of the two sequences provides a new value; Zip fires everytime both sequences have a new value available, and tipically this means when the slowest of the two has values. And/Then/When should not be right as well for same reason as Zip.

I've also checked SO thread combining one observable with latest from another observable, but I don't think that can apply here. Only in one of the comments I read

[...] and then Scan acts like a CombineLatest that filters for notifications from only one side

and somehow it sounded familiar, but I could not wrap my head around that.

Upvotes: 7

Views: 1976

Answers (4)

paulpdaniels
paulpdaniels

Reputation: 18663

The latest (beta) version of Rx.Net 2.3.0-beta2 has WithLatestFrom:

//Only emits when eventStream emits
eventStream.WithLatestFrom(stateStream.StartWith(defaultState), 
                           (evt, state) => new {Event = evt, State = state})
           .Subscribe(/*Do something*/);

If not you can fill it in a pinch by using (note untested):

public static IObservable<TResult> WithLatestFrom<TLeft, TRight, TResult>(
    this IObservable<TLeft> source,
    IObservable<TRight> other,
    Func<TLeft, TRight, TResult> resultSelector)
{
    return source.Publish(os =>
        other.Select(a => os
            .Select(b => resultSelector(b,a)))
            .Switch());
}

Source courtesy of @JamesWorld if I am not mistaken.

Upvotes: 0

Enigmativity
Enigmativity

Reputation: 117027

It seems to me that your current solution is actually pretty close, but it sounds like you just need to swap the eventStream & stateStream observables around and then remove the .Take(1).

Try this:

stateStream
    .Select(stateValue => 
        eventStream
            .Select(eventValue => new { Event = eventValue, State = stateValue }))
    .Switch()
    .Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
    .Subscribe(value => { /* do something */ });

Depending on how stateStream is configured you may need to add a .StartWith(...) to it to get the initial values from eventStream, but I think this approach covers your requirements.

Upvotes: 1

Aron
Aron

Reputation: 15772

I think you want Observable.Sample()

stateSource.Sample(eventSource)
     .Zip(eventSource,...)

Upvotes: 4

James World
James World

Reputation: 29776

You'll need a default value for the state stream, in case it hasn't ever emitted. Pass this default as the argument to MostRecent (I just used null here) and use the overload of Zip that expects an IEnumerable:

eventStream.Zip(
    stateStream.MostRecent(null),
    (evt,state) => new { Event = evt, State = state })
.Subscribe(/* etc */);

Upvotes: 0

Related Questions