Keyser Soze
Keyser Soze

Reputation: 405

Merge Firestore's separate queries Streams in Dart

I'm implementing a Todo Application in Flutter. I need to merge a double query in client, in order to perform an OR request in Firestore.

One hand, I have the following code that performs the double queries.

Future<Stream> combineStreams() async {
  Stream stream1 = todoCollection
      .where("owners", arrayContains: userId)
      .snapshots()
      .map((snapshot) {
    return snapshot.documents
        .map((doc) => Todo.fromEntity(TodoEntity.fromSnapshot(doc)))
        .toList();
  });
  Stream stream2 = todoCollection
      .where("contributors", arrayContains: userId)
      .snapshots()
      .map((snapshot) {
    return snapshot.documents
        .map((doc) => Todo.fromEntity(TodoEntity.fromSnapshot(doc)))
        .toList();
  });

  return StreamZip(([stream1, stream2])).asBroadcastStream();
}

And other hand, I have the following code that will perform the update of view with the Bloc pattern.

Stream<TodosState> _mapLoadTodosToState(LoadTodos event) async* {

    _todosSubscription?.cancel();

    var res = await _todosRepository.todos(event.userId);

    _todosSubscription = res.listen(
          (todos) {
        dispatch(
           TodosUpdated(todos));
      },
    );

  }

I have the following error.

flutter: Instance of '_AsBroadcastStream<List<List<Todo>>>'
[VERBOSE-2:ui_dart_state.cc(148)] Unhandled Exception: type 'List<List<Todo>>' is not a subtype of type 'List<Todo>'

I tried to look for more infos with the debugger and it turned out that my StreamZip source contains the 2 stream separately.

For the moment I can get one stream at a time.

I don't know how to proceed in order to get the 2 streams and display them.

Upvotes: 1

Views: 398

Answers (1)

haroldolivieri
haroldolivieri

Reputation: 2283

You're doing a map of a map, which returns a List of another List. You should zip the QuerySnapshot streams and do the mapping after creating the subscription, and then you can create a new Stream<List<Todo>> from it.

///private method to zip QuerySnapshot streams
Stream<List<QuerySnapshot>> _combineStreams() async {
  Stream stream1 = todoCollection
      .where("owners", arrayContains: userId)
      .snapshots()
  });
  Stream stream2 = todoCollection
      .where("contributors", arrayContains: userId)
      .snapshots()
  });

  return StreamZip(([stream1, stream2])).asBroadcastStream();
}

///exposed method to be consumed by repository
Stream<List<Todo>> todosStream() {
  var controller = StreamController<List<Todo>>();

  _combineStreams().listen((snapshots) {
    List<DocumentSnapshot> documents;

    snapshots.forEach((snapshot) {
      documents.addAll(snapshot.documents);
    });

    final todos = documents.map((document) {
      return Todo.fromEntity(TodoEntity.fromSnapshot(doc));
    }).toList();

    controller.add(todos);
  });

  return controller.stream;
}

Upvotes: 5

Related Questions