Benjol
Benjol

Reputation: 66531

'Flushing' observable Scan

This is a weird 'problem', I'm not sure what the best way to handle it is.

To simplify, let's say that I've got an observable source with some data reading coming from 'outside':

{ Value, TimeStamp }

I'm putting that through Observable.Scan so that I can output:

{ Value, TimeStamp, TimeDelta }

This means that my data always comes out 'one late', but that's not a problem.

We're 'recording' from this observable, and when you stop one recording, there's still one data value 'stuck' waiting for it's follower.

Even that's not a problem. The problem is that when you go to start recording again, the last value from the previous 'recording' gets stuck on to the beginning of the new one.

The most obvious thing to do is just to unsubscribe and resubscribe, but.... it's not that simple, because this scanned source is not only recorded, but also sent to the UI, and used for further calculations down the line: so I'd have to do an enormous unsubscribe/resubscribe.

I'm trying to think of a way to inject some kind of 'reset' data, but not sure how one goes about sending information back 'up' the observable stream...

Maybe I've just bitten off more than I can chew? Or used too much Observable?

Upvotes: 0

Views: 329

Answers (2)

Benjol
Benjol

Reputation: 66531

This is what I've boiled the accepted answer down to. Not yet in production, but tests seem to show it does what I want.

public interface IResetter
{
    IObservable<T> MakeResettable<T>(Func<IObservable<T>> selector);
}
public class Resetter : IResetter
{
    private Subject<Unit> _Resetter = new Subject<Unit>();
    public void Reset()
    {
        _Resetter.OnNext(Unit.Default);
    }
    public IObservable<T> MakeResettable<T>(Func<IObservable<T>> selector)
    {
        return
            _Resetter
                .StartWith(Unit.Default)
                .Select(_ => Observable.Defer(selector))
                .Switch()
                .Publish().RefCount();
    }
}

Upvotes: 0

Enigmativity
Enigmativity

Reputation: 117047

There are going to be a number of ways to do this, but one that is fairly easy is to use the .Switch() operator.

It essentially works like this: if you have an IObservable<IObservable<T>> you can then call .Switch() to turn it into an IObservable<T> where it basically subscribes to the last value produced by the outer observable and unsubscribes to the previously produced observable.

Now that sounds a bit funky, but here's how it can work. Given you have an observable called outsideObservable then you defining a second observable (resubscribeObservable) that produces a value every time you want to resubscribe, and you subscribe to them like this:

var subscription =
    resubscribeObservable
        .Select(_ => outsideObservable)
        .Switch()
        .Subscribe(x =>
        {
            /* Do stuff here */
        });

Now to resubscribe to outsideObservable you just have to produce a value from resubscribeObservable.

The easiest way to do this is to define it like var resubscribeObservable = new Subject<Unit>(); and then call resubscribeObservable.OnNext(Unit.Default); every time you want to resubscribe.

Alternatively if you have some event, say a user clicking a button, then you could use an observable based on that event as your resubscribeObservable.

Integrating suggestions from the comments, this would look something like:

var factory = Observable.Defer(() => outsideObservable);

var resetterObservable = new Subject<Unit>();

var resettableObservable = 
       resetterObservable
           .StartWith(Unit.Default)
           .Select(_ => factory)
           .Switch()
           .Publish()
           .RefCount();

The Publish().RefCount() is just to protect the outsideObservable from multiple simultaneous subscriptions.

Upvotes: 1

Related Questions