Reputation: 1728
I have a broadcast stream of a data stream of an BLE device.
In order to get data out of that stream I need to send data to the device before.
Stream<Data> dataStream() {
sendDataRequestToDevice();
return _broadcastController.stream;
}
Problem with that is that everything is asynchronous, meaning that when the stream is returned it's very likely that the actual event sent from the device is gone already. I am searching for something like:
Stream<Data> dataStream() {
return _broadcastController.stream
.doOnSubscribe(() => sendDataRequestToDevice()); // stolen from rxjava ;)
}
Is there something like this in the default Streaming library without using RxDart or similar. (I just dont wanna use it just for this purpose...)
Upvotes: 0
Views: 59
Reputation: 71623
If you care how many subscribers you have, you probably shouldn't be using a broadcast stream. The underlying idea of a broadcast stream is that it broadcasts without knowing (or caring) who is listening.
The onListen
and onCancel
callbacks were originally not part of a broadcast controller, and they break the model slightly. They do allow you to known when nobody is listening, but that is all.
In a situation like this, I'd make my own stream which records the listens and cancels.
class _ListenStream<T> extends Stream<T> {
final Stream<T> _source;
final void Function() _onListen;
final void Function() _onCancel;
_ListenStream(this._source, this._onListen, this._onCancel);
bool get isBroadcastStream => _source.isBroadcastStream;
StreamSubscription<T> listen(void Function(T) onData, {
Function onError, void Function() onDone, bool cancelOnError = false}) {
if (_onListen != null) _onListen();
return _ListenSubscription<T>(_source.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError),
_onCancel);
}
}
class _ListenSubscription<T> extends StreamSubscription<T> {
final _StreamSubscription<T> _source;
final void Function() _onCancel;
void onData(void handleData(T data)) { _source.onData(handleData); }
void onError(Function handleError) { _source.onError(handleError); }
void onDone(void handleDone()) { _source.onDone(handleDone); }
void pause([Future resumeSignal]) { _source.pause(resumeSignal); }
void resume() { _source.resume(); }
Future<E> asFuture<E>([E defaultValue]) => _source.asFuture<E>(defaultValue);
bool get isPaused => _source.isPaused;
Future cancel() {
var future = _source.cancel();
if (_onCancel != null) future = future.whenComplete(_onCancel);
return future;
}
}
Then, instead of providing the original stream, you give your clients the _ListenStream
wrapper of the original stream, along with the callbacks you want called on listen and cancel.
Upvotes: 1
Reputation: 1728
For everyone else searching for a solution:
I don't find this a good solution, but it does the job. Please tell me there's a better solution!
Stream<Data> dataStream() {
return StreamGroup.merge([
doOnSubscribe(() => sendDataRequestToDevice()),
_broadcastController.stream
]);
}
Stream doOnSubscribe(void onSubscribe()) {
StreamController controller;
controller = StreamController(
onListen: () {
onSubscribe();
controller.close();
},
);
return controller.stream;
}
Sadly it doesn't do the job for my BLE solution, because of other issues.
Upvotes: 0
Reputation: 657188
Seems you are looking for onListen
https://api.dartlang.org/stable/2.1.0/dart-async/StreamController/onListen.html
_broadcastController.onListen = () {
_broadcastController.add('foo');
};
You can also pass the handler to the constructor https://api.dartlang.org/stable/2.1.0/dart-async/StreamController/StreamController.html
Upvotes: 0