Reputation: 30202
I have a web component which subscribes to a stream.
Since the web component is re-created each time it's displayed, I have to clean up the subscriber and redo it.
Right now I am adding all subscribers to a list and in removed()
life-cycle method I'm doing :
subscriptions.forEach((sub) => sub.cancel());
Now, to the problem: when the web component isn't displayed, there's no one listening to the stream. The issue is that the component is missing data/events when it's not displayed.
What I need is buffering. Events need to be buffered and sent at once when a listener is registered. According to the documentation, buffering happens until a listener is registered:
The controller will buffer all incoming events until the subscriber is registered.
This works, but the problem is that the listener will at some point removed, and re-registered, and it appears this does not trigger buffering.
It appears that buffering happens only initially, not later on even if all listeners are gone.
So the question is: how do I buffer in this situation where listeners may be gone and back?
Upvotes: 10
Views: 3300
Reputation: 8128
Note: normally you shouldn't be able to resubscribe to a Stream that has already been closed. This seems to be a bug we forgot to fix.
I'm unfamiliar with web-components but I hope I'm addressing your problem with the following suggestion.
One way (and there are of course many) would be to create a new Stream for every subscriber (like html-events do) that pauses the original stream.
Say origin
is the original Stream. Then implement a stream
getter that returns a new Stream that is linked to origin
:
Untested code.
Stream origin;
var _subscription;
final _listeners = new Set<StreamController>();
_addListener(controller) {
_listeners.add(controller);
if (_subscription == null) {
_subscription = origin.listen((event) {
// When we emit the event we want listeners to be able to unsubscribe
// or add new listeners. In order to avoid ConcurrentModificationErrors
// we need to make sure that the _listeners set is not modified while
// we are iterating over it with forEach. Here we just create a copy with
// toList().
// Alternatively (more efficient) we could also queue subscription
// modification requests and do them after the forEach.
_listeners.toList().forEach((c) => c.add(event));
});
}
_subscription.resume(); // Just in case it was paused.
}
_removeListener(controller) {
_listeners.remove(controller);
if (_listeners.isEmpty) _subscription.pause();
}
Stream get stream {
var controller;
controller = new StreamController(
onListen: () => _addListener(controller),
onCancel: () => _removeListener(controller));
return controller.stream;
}
If you need to buffer events immediately you need to start the subscription right away and not lazily as in the sample code.
Upvotes: 12