Reputation: 5051
In RX dart there is the RX.combineLatest
method to combine the results of a stream using a callback function.
Problem is that it only emits a value when every stream has emitted a value. If one has not it does not emit.
Merges the given Streams into a single Stream sequence by using the combiner function whenever any of the stream sequences emits an item. The Stream will not emit until all streams have emitted at least one item.
Im trying to combine multiple streams into one stream for validation which should emit false or true when the streams have not emitted or emitted an empty value.
class FormBloc {
final BehaviorSubject<bool> _result = BehaviorSubject();
final BehaviorSubject<String?> _usernameController = BehaviorSubject();
final BehaviorSubject<String?> _emailController = BehaviorSubject();
// Will only emit if each stream emitted a value
// If only username is emitted valid is not emitted
Stream<bool> get valid$ => Rx.combineLatest2(
_usernameController.stream,
_emailController.stream,
(username, email) => username != null || email != null
);
}
How can I join those streams so valid$
emits a value if any of the streams change?
Upvotes: 6
Views: 5191
Reputation: 5051
Because all of the solutions here are workarounds Ive implemented my own stream class. Implementation equals the original CombineLatestStream implementation except that it does not wait for all streams to emit before emitting:
import 'dart:async';
import 'package:rxdart/src/utils/collection_extensions.dart';
import 'package:rxdart/src/utils/subscription.dart';
class CombineAnyLatestStream<T, R> extends StreamView<R> {
CombineAnyLatestStream(List<Stream<T>> streams, R Function(List<T?>) combiner) : super(_buildController(streams, combiner).stream);
static StreamController<R> _buildController<T, R>(
Iterable<Stream<T>> streams,
R Function(List<T?> values) combiner,
) {
int completed = 0;
late List<StreamSubscription<T>> subscriptions;
List<T?>? values;
final _controller = StreamController<R>(sync: true);
_controller.onListen = () {
void onDone() {
if (++completed == streams.length) {
_controller.close();
}
}
subscriptions = streams.mapIndexed((index, stream) {
return stream.listen(
(T event) {
final R combined;
if (values == null) return;
values![index] = event;
try {
combined = combiner(List<T?>.unmodifiable(values!));
} catch (e, s) {
_controller.addError(e, s);
return;
}
_controller.add(combined);
},
onError: _controller.addError,
onDone: onDone
);
}).toList(growable: false);
if (subscriptions.isEmpty) {
_controller.close();
} else {
values = List<T?>.filled(subscriptions.length, null);
}
};
_controller.onPause = () => subscriptions.pauseAll();
_controller.onResume = () => subscriptions.resumeAll();
_controller.onCancel = () {
values = null;
return subscriptions.cancelAll();
};
return _controller;
}
}
Upvotes: 3
Reputation: 1151
You could seed your BehaviorSubjects with a default value of null:
final BehaviorSubject<String?> _usernameController = BehaviorSubject().seeded(null);
final BehaviorSubject<String?> _emailController = BehaviorSubject().seeded(null);
Another possibility would be to give the combined stream a seed value:
Rx.combineLatest2(
_usernameController.stream,
_emailController.stream,
(username, email) => username != null || email != null
).sharedValueSeeded(false);
Upvotes: 1
Reputation: 1150
Instead of combining multiple streams, you can use one BehaviorSubject<Map<String, String?>>
to emit changes in username or email.
add
either changed\submitted username or email to the BehaviorSubject
_usernameEmailController.add({"uname": value},);
or
_usernameEmailController.add({"email": value},);
so that you can validate the inputs by listening to it. I used StreamBuilder
to display the emitted values,
StreamBuilder<Map<String, String?>>(
stream: _usernameEmailController.stream
.map((data) {
_r = {..._r, ...data};
return _r;
}),
builder: (context, snapshot) {
return Column(
children: [
Text(snapshot.data.toString()),
if (snapshot.hasData)
Text(
"Is valid?: "
"${(snapshot.data!["uname"] != null && snapshot.data!["uname"]!.isNotEmpty) || (snapshot.data!["email"] != null && snapshot.data!["email"]!.isNotEmpty)}"
),
],
);
},
),
checkout the my solution on DartPad here.
In the DartPad I have used StreamController
instead of BehaviorSubject
as DartPad doesn't support rxdart
Package.
But you can replace line 40 in DartPad
final StreamController<Map<String, String?>> _usernameEmailController =
StreamController();
with
final BehaviorSubject<Map<String, String?>> _usernameEmailController =
BehaviorSubject();
If you want to use BehaviorSubject
.
Upvotes: 0
Reputation: 1
Creating new stream which emits current value and listen the stream is my best practice.
class FormBloc {
final BehaviorSubject<bool> _result = BehaviorSubject();
final BehaviorSubject<String?> _usernameController = BehaviorSubject();
final BehaviorSubject<String?> _emailController = BehaviorSubject();
final _usernameStreamController = StreamController<String?>()
..add(_usernameController.value)
..addStream(_usernameController.stream);
final _emailStreamController = StreamController<String?>()
..add(_emailController.value)
..addStream(_emailController.stream);
Stream<bool> get valid$ => Rx.combineLatest2(
_usernameStreamController.stream, // use streamController instead
_emailStreamController.stream, // use streamController instead
(username, email) => username != null || email != null
);
}
Upvotes: 0