Scorb
Scorb

Reputation: 1850

Can I listen to a StreamController's stream multiple times (but not more than one subscription at a time)?

I am using a StreamController in dart. I would like to be able to stop listening to the stream of the controller and then start listening again. I do not require to have multiple listeners at once. I just require that I listen to the stream, then stop listening to the stream, and then establish a new listener afterwards.

I have created a minimal example where I try to cancel the initial subscription, and then listen to the stream again, but I still get a bad state error.

import 'dart:async';

void main(List<String> arguments) async {
  var controller = StreamController<String>();
  var sub = controller.stream.listen((event) { });
  await sub.cancel();
  controller.stream.listen((event) { }); // throws bad state
}

Upvotes: 3

Views: 5218

Answers (1)

lrn
lrn

Reputation: 71623

The StreamController class creates either a single-subscription stream (which you can only listen to once) or a broadcast stream (if created using StreamController.broadcast) which can be listened to multiple times.

For your described use, you'd want the broadcast variant.

You probably want to avoid sending events while that controller has no listeners (broadcast streams can emit events even when there are no listeners, they are broadcast into the void). The onCancel and onListen callbacks of a broadcast stream controller are called when the last listener is cancelled and the first (new) listener is added.

The StreamController.broadcast controller doesn't prevent you from having multiple simultaneous listeners, but that's something you should be able to avoid just by being careful.

Example:

import 'dart:async';

void main(List<String> arguments) async {
  var controller = StreamController<String>.broadcast();
  controller.onListen = () {
    print("Active");
  };
  controller.onCancel = () {
    print("Inactive");
  };
  var sub = controller.stream.listen((event) { }); // "Active"
  await sub.cancel(); // "Inactive"
  controller.stream.listen((event) { }); // "Active"
}

If you really want to insist on "one listener at a time", you can wrap the stream in something like:

import "dart:async";

/// Allows a stream to be listened to multiple times.
///
/// Returns a new stream which has the same events as [source],
/// but which can be listened to more than once.
/// Only allows one listener at a time, but when a listener
/// cancels, another can start listening and take over the stream.
///
/// If the [source] is a broadcast stream, the listener on
/// the source is cancelled while there is no listener on the
/// returned stream.
/// If the [source] is not a broadcast stream, the subscription
/// on the source stream is maintained, but paused, while there
/// is no listener on the returned stream.
///
/// Only listens on the [source] stream when the returned stream
/// is listened to.
Stream<T> resubscribeStream<T>(Stream<T> source) {
  MultiStreamController<T>? current;
  StreamSubscription<T>? sourceSubscription;
  bool isDone = false;
  void add(T value) {
    current!.addSync(value);
  }

  void addError(Object error, StackTrace stack) {
    current!.addErrorSync(error, stack);
  }

  void close() {
    isDone = true;
    current!.close();
    current = null;
    sourceSubscription = null;
  }

  return Stream<T>.multi((controller) {
    if (isDone) {
      controller.close(); // Or throw StateError("Stream has ended");
      return;
    }
    if (current != null) throw StateError("Has listener");
    current = controller;
    var subscription = sourceSubscription ??=
        source.listen(add, onError: addError, onDone: close);
    subscription.resume();
    controller
      ..onPause = subscription.pause
      ..onResume = subscription.resume
      ..onCancel = () {
        current = null;
        if (source.isBroadcast) {
          sourceSubscription = null;
          return subscription.cancel();
        }
        subscription.pause();
        return null;
      };
  });
}

Upvotes: 6

Related Questions