borovikpe
borovikpe

Reputation: 71

How to re-subscribe to sequence in particular point?

I'm trying to solve the following: a) subscriber receives events from IObservable for some time. Then it unsubscribes, do some stuff and then subscribe again. Here it should start receiving events from exactly the same point where unsubscription was performed. b) Such behavior is desirable for multiple subscribers model. E.g. when one has unsubscribed, others should continue receiving events.

Are there any suggestions from the RX side?

Thanks in advance!

Upvotes: 0

Views: 287

Answers (2)

Enigmativity
Enigmativity

Reputation: 117175

Here's a reasonably simple Rx way to do what you want copied from my answer to this other question. I've created an extension method called Pausable that takes a source observable and a second observable of boolean that pauses or resumes the observable.

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}

It can be used like this:

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);

Upvotes: 1

Ilian
Ilian

Reputation: 5355

It sounds like you need a "pausable" stream. Assuming that only 1 subscriber will handle the values at a time (while the other subscribers just wait), this solution is probably what you need.

Upvotes: 0

Related Questions