CodingHero
CodingHero

Reputation: 3015

How can I clear the buffer on a ReplaySubject?

How can I clear the buffer on a ReplaySubject?

Periodically I need to clear the buffer (as an end of day event in my case) to prevent the ReplaySubject continually growing and eventually eating all the memory.

Ideally I want to keep the same ReplaySubject as the client subscriptions are still good.

Upvotes: 29

Views: 17338

Answers (3)

Mukul Pathak
Mukul Pathak

Reputation: 477

Well I don't know about c# but I managed to get it done in replay subject rxdart . As for replaysubject it uses queue for caching the events so I modified the replaysubject class.

  1. I changed all the queues to List
  2. Added onRemove method which will remove event from the chached list.

Original ReplaySubject :

class ReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final Queue<T> _queue;
final int _maxSize;

/// Constructs a [ReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ReplaySubject({
    int maxSize,
    void Function() onListen,
    void Function() onCancel,
    bool sync = false,
}) {
    // ignore: close_sinks
    final controller = StreamController<T>.broadcast(
        onListen: onListen,
        onCancel: onCancel,
        sync: sync,
    );

    final queue = Queue<T>();

    return ReplaySubject<T>._(
        controller,
        Rx.defer<T>(
                () => controller.stream.startWithMany(queue.toList(growable: false)),
            reusable: true,
        ),
        queue,
        maxSize,
    );
}

ReplaySubject._(
    StreamController<T> controller,
    Stream<T> stream,
    this._queue,
    this._maxSize,
    ) : super(controller, stream);

@override
void onAdd(T event) {
    if (_queue.length == _maxSize) {
        _queue.removeFirst();
    }

    _queue.add(event);
}

@override
List<T> get values => _queue.toList(growable: false);
}

Modified Replay subject :

class ModifiedReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final List<T> _list;
final int _maxSize;

/// Constructs a [ModifiedReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ModifiedReplaySubject({
    int maxSize,
    void Function() onListen,
    void Function() onCancel,
    bool sync = false,
}) {
    // ignore: close_sinks
    final controller = StreamController<T>.broadcast(
        onListen: onListen,
        onCancel: onCancel,
        sync: sync,
    );

    final queue = List<T>();

    return ModifiedReplaySubject<T>._(
        controller,
        Rx.defer<T>(
                () => controller.stream.startWithMany(queue.toList(growable: false)),
            reusable: true,
        ),
        queue,
        maxSize,
    );
}

ModifiedReplaySubject._(
    StreamController<T> controller,
    Stream<T> stream,
    this._list,
    this._maxSize,
    ) : super(controller, stream);

@override
void onAdd(T event) {
    if (_list.length == _maxSize) {
        _list.removeAt(0);
    }

    _list.add(event);
}

void onRemove(T event) {
    _list.remove(event);
}

@override
List<T> get values => _list.toList(growable: false);
}

Upvotes: 0

Lee Oades
Lee Oades

Reputation: 1688

It is likely that you already have an Observable source of data, in which case, here is another solution. This one uses composition of existing RX constructs rather than building your own ISubject which I'm personally wary of.

public class ClearableReplaySubject<TSource, TClearTrigger> : IConnectableObservable<TSource>
{
    private readonly IConnectableObservable<IObservable<TSource>> _underlying;
    private readonly SerialDisposable _replayConnectDisposable = new SerialDisposable();

    public ClearableReplaySubject(IObservable<TSource> src, IObservable<TClearTrigger> clearTrigger)
    {
        _underlying = clearTrigger.Select(_ => Unit.Default).StartWith(Unit.Default)
            .Select(_ =>
            {
                var underlyingReplay = src.Replay();
                _replayConnectDisposable.Disposable = underlyingReplay.Connect();
                return underlyingReplay;
            })
            .Replay(1);
    }

    public IDisposable Subscribe(IObserver<TSource> observer)
    {
        return _underlying.Switch().Subscribe(observer);
    }

    public IDisposable Connect()
    {
        return new CompositeDisposable(_underlying.Connect(), _replayConnectDisposable.Disposable);
    }
}

If you add the following extension method to your ObservableEx:

public static class ObservableEx
{
    public static IConnectableObservable<TItem> ReplayWithReset<TItem, TReset>(this IObservable<TItem> src, IObservable<TReset> resetTrigger)
    {
        return new ClearableReplaySubject<TItem, TReset>(src, resetTrigger);
    }
}

then you can take your source and add .ReplayWithReset(...) with your reset trigger Observable. This could be a timer or whatever.

var replay = sourceObservable.ReplayWithReset(triggerObservable);
var connection = replay.Connect();

The connect behaves in the same way as a Replay would.

Upvotes: 0

James World
James World

Reputation: 29776

ReplaySubject doesn't offer a means to clear the buffer, but there are several overloads to constrain its buffers in different ways:

  • A maximum TimeSpan that items are retained for
  • A maximum item count
  • A combination of the above, which drops items as soon as either condition is met.

A Clearable ReplaySubject

This was quite an interesting problem - I decided to see how easy it would be to implement a variation of ReplaySubject you can clear - using existing subjects and operators (as these are quite robust). Turns out it was reasonably straightforward.

I've run this through a memory profiler to check it does the right thing. Call Clear() to flush the buffer, otherwise it works just like a regular unbounded ReplaySubject:

public class RollingReplaySubject<T> : ISubject<T>
{
    private readonly ReplaySubject<IObservable<T>> _subjects;
    private readonly IObservable<T> _concatenatedSubjects;
    private ISubject<T> _currentSubject;

    public RollingReplaySubject()
    {
        _subjects = new ReplaySubject<IObservable<T>>(1);
        _concatenatedSubjects = _subjects.Concat();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void Clear()
    {
        _currentSubject.OnCompleted();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void OnNext(T value)
    {
        _currentSubject.OnNext(value);
    }

    public void OnError(Exception error)
    {
        _currentSubject.OnError(error);
    }

    public void OnCompleted()
    {
        _currentSubject.OnCompleted();
        _subjects.OnCompleted();     
        // a quick way to make the current ReplaySubject unreachable
        // except to in-flight observers, and not hold up collection
        _currentSubject = new Subject<T>();       
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _concatenatedSubjects.Subscribe(observer);
    }
}

Respect usual rules (as with any Subject) and don't call methods on this class concurrently - including Clear(). You could add synchronization locks trivially if needed.

It works by nesting a sequence of ReplaySubjects inside a master ReplaySubject. The outer ReplaySubject (_subjects) holds a buffer of exactly one inner ReplaySubject (_currentSubject), and it is populated on construction.

The OnXXX methods call through to the _currentSubject ReplaySubject.

Observers are subscribed to a concatenated projection of the nested ReplaySubjects (held in _concatenatedSubjects). Because the buffer size of _subjects is just 1, new subscribers acquire the events of only the most recent ReplaySubject onwards.

Whenever we need to "clear the buffer", the existing _currentSubject is OnCompleted and a new ReplaySubject is added to _subjects and becomes the new _currentSubject.

Enhancements

Following @Brandon's suggestion, I created a version of RollingReplaySubject that uses either a TimeSpan or an input stream to signal buffer clearing. I created a Gist for this here: https://gist.github.com/james-world/c46f09f32e2d4f338b07

Upvotes: 27

Related Questions