Reputation: 3015
How can I clear the buffer on a ReplaySubject
?
Periodically I need to clear the buffer (as an end of day event in my case) to prevent the ReplaySubject
continually growing and eventually eating all the memory.
Ideally I want to keep the same ReplaySubject
as the client subscriptions are still good.
Upvotes: 29
Views: 17338
Reputation: 477
Well I don't know about c# but I managed to get it done in replay subject rxdart . As for replaysubject it uses queue for caching the events so I modified the replaysubject class.
Original ReplaySubject :
class ReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final Queue<T> _queue;
final int _maxSize;
/// Constructs a [ReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ReplaySubject({
int maxSize,
void Function() onListen,
void Function() onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final queue = Queue<T>();
return ReplaySubject<T>._(
controller,
Rx.defer<T>(
() => controller.stream.startWithMany(queue.toList(growable: false)),
reusable: true,
),
queue,
maxSize,
);
}
ReplaySubject._(
StreamController<T> controller,
Stream<T> stream,
this._queue,
this._maxSize,
) : super(controller, stream);
@override
void onAdd(T event) {
if (_queue.length == _maxSize) {
_queue.removeFirst();
}
_queue.add(event);
}
@override
List<T> get values => _queue.toList(growable: false);
}
Modified Replay subject :
class ModifiedReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final List<T> _list;
final int _maxSize;
/// Constructs a [ModifiedReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ModifiedReplaySubject({
int maxSize,
void Function() onListen,
void Function() onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final queue = List<T>();
return ModifiedReplaySubject<T>._(
controller,
Rx.defer<T>(
() => controller.stream.startWithMany(queue.toList(growable: false)),
reusable: true,
),
queue,
maxSize,
);
}
ModifiedReplaySubject._(
StreamController<T> controller,
Stream<T> stream,
this._list,
this._maxSize,
) : super(controller, stream);
@override
void onAdd(T event) {
if (_list.length == _maxSize) {
_list.removeAt(0);
}
_list.add(event);
}
void onRemove(T event) {
_list.remove(event);
}
@override
List<T> get values => _list.toList(growable: false);
}
Upvotes: 0
Reputation: 1688
It is likely that you already have an Observable source of data, in which case, here is another solution. This one uses composition of existing RX constructs rather than building your own ISubject which I'm personally wary of.
public class ClearableReplaySubject<TSource, TClearTrigger> : IConnectableObservable<TSource>
{
private readonly IConnectableObservable<IObservable<TSource>> _underlying;
private readonly SerialDisposable _replayConnectDisposable = new SerialDisposable();
public ClearableReplaySubject(IObservable<TSource> src, IObservable<TClearTrigger> clearTrigger)
{
_underlying = clearTrigger.Select(_ => Unit.Default).StartWith(Unit.Default)
.Select(_ =>
{
var underlyingReplay = src.Replay();
_replayConnectDisposable.Disposable = underlyingReplay.Connect();
return underlyingReplay;
})
.Replay(1);
}
public IDisposable Subscribe(IObserver<TSource> observer)
{
return _underlying.Switch().Subscribe(observer);
}
public IDisposable Connect()
{
return new CompositeDisposable(_underlying.Connect(), _replayConnectDisposable.Disposable);
}
}
If you add the following extension method to your ObservableEx:
public static class ObservableEx
{
public static IConnectableObservable<TItem> ReplayWithReset<TItem, TReset>(this IObservable<TItem> src, IObservable<TReset> resetTrigger)
{
return new ClearableReplaySubject<TItem, TReset>(src, resetTrigger);
}
}
then you can take your source and add .ReplayWithReset(...) with your reset trigger Observable. This could be a timer or whatever.
var replay = sourceObservable.ReplayWithReset(triggerObservable);
var connection = replay.Connect();
The connect behaves in the same way as a Replay would.
Upvotes: 0
Reputation: 29776
ReplaySubject
doesn't offer a means to clear the buffer, but there are several overloads to constrain its buffers in different ways:
TimeSpan
that items are retained forThis was quite an interesting problem - I decided to see how easy it would be to implement a variation of ReplaySubject
you can clear - using existing subjects and operators (as these are quite robust). Turns out it was reasonably straightforward.
I've run this through a memory profiler to check it does the right thing. Call Clear()
to flush the buffer, otherwise it works just like a regular unbounded ReplaySubject
:
public class RollingReplaySubject<T> : ISubject<T>
{
private readonly ReplaySubject<IObservable<T>> _subjects;
private readonly IObservable<T> _concatenatedSubjects;
private ISubject<T> _currentSubject;
public RollingReplaySubject()
{
_subjects = new ReplaySubject<IObservable<T>>(1);
_concatenatedSubjects = _subjects.Concat();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void Clear()
{
_currentSubject.OnCompleted();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void OnNext(T value)
{
_currentSubject.OnNext(value);
}
public void OnError(Exception error)
{
_currentSubject.OnError(error);
}
public void OnCompleted()
{
_currentSubject.OnCompleted();
_subjects.OnCompleted();
// a quick way to make the current ReplaySubject unreachable
// except to in-flight observers, and not hold up collection
_currentSubject = new Subject<T>();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _concatenatedSubjects.Subscribe(observer);
}
}
Respect usual rules (as with any Subject
) and don't call methods on this class concurrently - including Clear()
. You could add synchronization locks trivially if needed.
It works by nesting a sequence of ReplaySubjects inside a master ReplaySubject. The outer ReplaySubject (_subjects
) holds a buffer of exactly one inner ReplaySubject (_currentSubject
), and it is populated on construction.
The OnXXX
methods call through to the _currentSubject
ReplaySubject.
Observers are subscribed to a concatenated projection of the nested ReplaySubjects (held in _concatenatedSubjects
). Because the buffer size of _subjects
is just 1, new subscribers acquire the events of only the most recent ReplaySubject
onwards.
Whenever we need to "clear the buffer", the existing _currentSubject
is OnCompleted
and a new ReplaySubject is added to _subjects
and becomes the new _currentSubject
.
Following @Brandon's suggestion, I created a version of RollingReplaySubject
that uses either a TimeSpan
or an input stream to signal buffer clearing. I created a Gist for this here: https://gist.github.com/james-world/c46f09f32e2d4f338b07
Upvotes: 27