Reputation: 165
I need to run many async functions concurrently and yield the results as they complete, order doesn't matter.
Here is what i have in a simplified example, of course this does not work right because it's waiting for every response before moving to the next request.
Stream<String> stringGenerator(List<http.Request> requests) async* {
final httpClient = http.Client();
for (var req in requests) {
final response = await httpClient.send(req);
yield response.headers['example'];
}
}
Upvotes: 2
Views: 307
Reputation: 71693
A generalized alternative to @julemand101's solution which works for any kind of futures:
Stream<T> fromFutures<T>(Iterable<Future<T>> futures) {
var pending = 0;
var controller = Controller<T>();
for (var future in futures) {
pending++;
future.then((v) {
controller.add(v);
if (--pending == 0) controller.close();
}, onError: (e, s) {
controller.addError(e, s);
if (--pending == 0) controller.close();
});
}
return controller.stream;
}
You can use this the specify stringGenerator
as:
Stream<String> stringGenerator(List<http.Request> requests) async* {
var client = http.Client();
yield* fromFutures(requests.map(client.send));
}
Upvotes: 0
Reputation: 31219
Could you try and see if this is working for you?
Stream<String> stringGenerator(List<http.Request> requests) {
final controller = StreamController<String>();
final httpClient = http.Client();
Future.wait(requests.map((req) => httpClient
.send(req)
.then((response) => controller.add(response.headers['example']!))))
.whenComplete(() => controller.close());
return controller.stream;
}
More correct would be this, since we don't want to generate events before we are listening for them according to the documentation for StreamController
. It is really not an issue for internal usage since StreamController
does buffer events until a listener are subscribed:
Stream<String> stringGenerator(List<http.Request> requests) {
final controller = StreamController<String>();
controller.onListen = () {
final httpClient = http.Client();
Future.wait(requests.map((req) => httpClient
.send(req)
.then((response) => controller.add(response.headers['example']!))))
.whenComplete(() => controller.close());
};
return controller.stream;
}
Upvotes: 2