andre
andre

Reputation: 1728

Do something when someone subscribes?

I have a broadcast stream of a data stream of an BLE device.

In order to get data out of that stream I need to send data to the device before.

Stream<Data> dataStream() {
  sendDataRequestToDevice();
  return _broadcastController.stream;
}

Problem with that is that everything is asynchronous, meaning that when the stream is returned it's very likely that the actual event sent from the device is gone already. I am searching for something like:

Stream<Data> dataStream() {
  return _broadcastController.stream
    .doOnSubscribe(() => sendDataRequestToDevice()); // stolen from rxjava ;)
}

Is there something like this in the default Streaming library without using RxDart or similar. (I just dont wanna use it just for this purpose...)

Upvotes: 0

Views: 59

Answers (3)

lrn
lrn

Reputation: 71623

If you care how many subscribers you have, you probably shouldn't be using a broadcast stream. The underlying idea of a broadcast stream is that it broadcasts without knowing (or caring) who is listening. The onListen and onCancel callbacks were originally not part of a broadcast controller, and they break the model slightly. They do allow you to known when nobody is listening, but that is all.

In a situation like this, I'd make my own stream which records the listens and cancels.

class _ListenStream<T> extends Stream<T> {
  final Stream<T> _source;
  final void Function() _onListen; 
  final void Function() _onCancel;
  _ListenStream(this._source, this._onListen, this._onCancel);
  bool get isBroadcastStream => _source.isBroadcastStream;
  StreamSubscription<T> listen(void Function(T) onData, {
    Function onError, void Function() onDone, bool cancelOnError = false}) {
    if (_onListen != null) _onListen();
    return _ListenSubscription<T>(_source.listen(onData, 
        onError: onError, onDone: onDone, cancelOnError: cancelOnError),
        _onCancel);  
  }
}
class _ListenSubscription<T> extends StreamSubscription<T> {
  final _StreamSubscription<T> _source;
  final void Function() _onCancel;

  void onData(void handleData(T data)) { _source.onData(handleData); }
  void onError(Function handleError) { _source.onError(handleError); }
  void onDone(void handleDone()) { _source.onDone(handleDone); }

  void pause([Future resumeSignal]) { _source.pause(resumeSignal); }
  void resume() { _source.resume(); }
  Future<E> asFuture<E>([E defaultValue]) => _source.asFuture<E>(defaultValue);
  bool get isPaused => _source.isPaused;

  Future cancel() {
    var future = _source.cancel();
    if (_onCancel != null) future = future.whenComplete(_onCancel);
    return future;
  }
}

Then, instead of providing the original stream, you give your clients the _ListenStream wrapper of the original stream, along with the callbacks you want called on listen and cancel.

Upvotes: 1

andre
andre

Reputation: 1728

For everyone else searching for a solution:

I don't find this a good solution, but it does the job. Please tell me there's a better solution!

Stream<Data> dataStream() {
  return StreamGroup.merge([
        doOnSubscribe(() => sendDataRequestToDevice()),
        _broadcastController.stream
      ]);
}

Stream doOnSubscribe(void onSubscribe()) {
  StreamController controller;
  controller = StreamController(
    onListen: () {
      onSubscribe();
      controller.close();
    },
  );
  return controller.stream;
}

Sadly it doesn't do the job for my BLE solution, because of other issues.

Upvotes: 0

G&#252;nter Z&#246;chbauer
G&#252;nter Z&#246;chbauer

Reputation: 657188

Seems you are looking for onListen

https://api.dartlang.org/stable/2.1.0/dart-async/StreamController/onListen.html

_broadcastController.onListen = () {
  _broadcastController.add('foo');
};

You can also pass the handler to the constructor https://api.dartlang.org/stable/2.1.0/dart-async/StreamController/StreamController.html

Upvotes: 0

Related Questions