liloargana
liloargana

Reputation: 165

Yield values from generator from async functions without awaiting

I need to run many async functions concurrently and yield the results as they complete, order doesn't matter.

Here is what i have in a simplified example, of course this does not work right because it's waiting for every response before moving to the next request.

 Stream<String> stringGenerator(List<http.Request> requests) async* {
    final httpClient = http.Client();
    for (var req in requests) {
      final response = await httpClient.send(req);
      yield response.headers['example'];
    }
  }

Upvotes: 2

Views: 307

Answers (2)

lrn
lrn

Reputation: 71693

A generalized alternative to @julemand101's solution which works for any kind of futures:

Stream<T> fromFutures<T>(Iterable<Future<T>> futures) {
  var pending = 0;
  var controller = Controller<T>();
  for (var future in futures) {
    pending++;
    future.then((v) {
      controller.add(v);
      if (--pending == 0) controller.close();
    }, onError: (e, s) {
      controller.addError(e, s);
      if (--pending == 0) controller.close();
    });
  }
  return controller.stream;
}

You can use this the specify stringGenerator as:

Stream<String> stringGenerator(List<http.Request> requests) async* {
  var client = http.Client();
  yield* fromFutures(requests.map(client.send));
}

Upvotes: 0

julemand101
julemand101

Reputation: 31219

Could you try and see if this is working for you?

Stream<String> stringGenerator(List<http.Request> requests) {
  final controller = StreamController<String>();
  final httpClient = http.Client();

  Future.wait(requests.map((req) => httpClient
          .send(req)
          .then((response) => controller.add(response.headers['example']!))))
      .whenComplete(() => controller.close());

  return controller.stream;
}

More correct would be this, since we don't want to generate events before we are listening for them according to the documentation for StreamController. It is really not an issue for internal usage since StreamController does buffer events until a listener are subscribed:

Stream<String> stringGenerator(List<http.Request> requests) {
  final controller = StreamController<String>();
  
  controller.onListen = () {
    final httpClient = http.Client();

    Future.wait(requests.map((req) => httpClient
        .send(req)
        .then((response) => controller.add(response.headers['example']!))))
        .whenComplete(() => controller.close());
  };

  return controller.stream;
}

Upvotes: 2

Related Questions