Cel
Cel

Reputation: 6669

store retrieve IObservable subscription state in Rx

[ this question is in the realm of Reactive Extensions (Rx) ]

A subscription that needs to continue on application restart

int nValuesBeforeOutput = 123;

myStream.Buffer(nValuesBeforeOutput).Subscribe(
    i => Debug.WriteLine("Something Critical on Every 123rd Value"));

Now I need to serialize and deserialize the state of this subscription so that next time the application is started the buffer count does NOT start from zero, but from whatever the buffer count got to before application exit.



From Answer to Solution

Based on Paul Betts approach, here's a semi-generalizable implementation that worked in my initial testing

Use

int nValuesBeforeOutput = 123;

var myRecordableStream = myStream.Record(serializer);
myRecordableStream.Buffer(nValuesBeforeOutput).ClearRecords(serializer).Subscribe(
    i => Debug.WriteLine("Something Critical on Every 123rd Value"));

Extension methods

    private static bool _alreadyRecording;

    public static IObservable<T> Record<T>(this IObservable<T> input,
                                           IRepositor repositor) 
    {
        IObservable<T> output = input;
        List<T> records = null;
        if (repositor.Deserialize(ref records))
        {
            ISubject<T> history = new ReplaySubject<T>();
            records.ForEach(history.OnNext);
            output = input.Merge(history);
        }
        if (!_alreadyRecording)
        {
            _alreadyRecording = true;
            input.Subscribe(i => repositor.SerializeAppend(new List<T> {i}));
        }
        return output;
    }

    public static IObservable<T> ClearRecords<T>(this IObservable<T> input,
                                                 IRepositor repositor)
    {
        input.Subscribe(i => repositor.Clear());
        return input;
    }

Notes

Upvotes: 6

Views: 478

Answers (1)

Ana Betts
Ana Betts

Reputation: 74692

There is no general solution for this, and making one would be NonTrivial™. The closest thing you can do is make myStream some sort of replay Observable (i.e. instead of serializing the state, serialize the state of myStream and redo the work to get you back to where you were).

Upvotes: 1

Related Questions