Kai Sellgren
Kai Sellgren

Reputation: 30202

How to buffer stream events?

I have a web component which subscribes to a stream.

Since the web component is re-created each time it's displayed, I have to clean up the subscriber and redo it.

Right now I am adding all subscribers to a list and in removed() life-cycle method I'm doing :

subscriptions.forEach((sub) => sub.cancel());

Now, to the problem: when the web component isn't displayed, there's no one listening to the stream. The issue is that the component is missing data/events when it's not displayed.

What I need is buffering. Events need to be buffered and sent at once when a listener is registered. According to the documentation, buffering happens until a listener is registered:

The controller will buffer all incoming events until the subscriber is registered.

This works, but the problem is that the listener will at some point removed, and re-registered, and it appears this does not trigger buffering.

It appears that buffering happens only initially, not later on even if all listeners are gone.

So the question is: how do I buffer in this situation where listeners may be gone and back?

Upvotes: 10

Views: 3300

Answers (1)

Florian Loitsch
Florian Loitsch

Reputation: 8128

Note: normally you shouldn't be able to resubscribe to a Stream that has already been closed. This seems to be a bug we forgot to fix.

I'm unfamiliar with web-components but I hope I'm addressing your problem with the following suggestion.

One way (and there are of course many) would be to create a new Stream for every subscriber (like html-events do) that pauses the original stream.

Say origin is the original Stream. Then implement a stream getter that returns a new Stream that is linked to origin:

Untested code.

Stream origin;
var _subscription;
final _listeners = new Set<StreamController>();

_addListener(controller) {
  _listeners.add(controller);
  if (_subscription == null) {
    _subscription = origin.listen((event) {
      // When we emit the event we want listeners to be able to unsubscribe
      // or add new listeners. In order to avoid ConcurrentModificationErrors
      // we need to make sure that the _listeners set is not modified while
      // we are iterating over it with forEach. Here we just create a copy with
      // toList().
      // Alternatively (more efficient) we could also queue subscription
      // modification requests and do them after the forEach.
      _listeners.toList().forEach((c) => c.add(event));
    });
  }
  _subscription.resume();  // Just in case it was paused.
}
_removeListener(controller) {
  _listeners.remove(controller);
  if (_listeners.isEmpty) _subscription.pause();
}

Stream get stream {
  var controller;
  controller = new StreamController(
      onListen: () => _addListener(controller),
      onCancel: () => _removeListener(controller));
  return controller.stream;
}

If you need to buffer events immediately you need to start the subscription right away and not lazily as in the sample code.

Upvotes: 12

Related Questions