Flutter merge two firestore streams into a single stream

I simply want to perform an 'OR' operation and get the both results of two queries into one stream.

Here's my code with a single stream

 StreamBuilder(
    stream: Firestore.instance
        .collection('list')
        .where('id', isEqualTo: 'false')
        .orderBy('timestamp')
        .snapshots(),
    builder: (context, snapshot) {
      if (!snapshot.hasData)
        return Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Center(
              child: CircularProgressIndicator(),
            )
          ],
        );
      if (snapshot.data.documents.length == 0)
        return const Center(
          child: Text(
            "Not Available",
            style: TextStyle(fontSize: 30.0, color: Colors.grey),
          ),
        );
      return ListView.builder(
        padding: EdgeInsets.all(5.0),
        key: Key(randomString(20)),
        itemCount: snapshot.data.documents.length,
        itemBuilder: (BuildContext context, int index) {
          return ListCard(snapshot.data.documents[index]);
        },
      );
    }),

Instead of a single stream now I want to feed two stream to the same stream builder.

I tried StreamGroup but it's not working since Widgets rebuild

StreamGroup.merge([streamOne, streamTwo]).asBroadcastStream();

I tried followed method also

 Stream<List<DocumentSnapshot>> searchResult()  {
List<Stream<List<DocumentSnapshot>>> streamList = [];

Firestore.instance
    .collection('room-list')
    .where('id', isEqualTo: 'false')
    .snapshots()
    .forEach((snap) {
  streamList.add(Observable.just(snap.documents));
});

Firestore.instance
    .collection('room-list')
    .where('id', isEqualTo: 'pending')
    .snapshots()
    .forEach((snap) {
  streamList.add(Observable.just(snap.documents));
});

var x = Observable.merge(streamList)
    .scan<List<DocumentSnapshot>>((acc, curr, i) {
  return acc ?? <DocumentSnapshot>[]
    ..addAll(curr);
});
return x;
}

Here I get the error there should be at least a single stream to merge. Its because Observable.merge(streamList) is called before items are added to streamList.

I simply want to get the both results of two queries into one stream.

Upvotes: 12

Views: 21162

Answers (7)

Leo
Leo

Reputation: 21

Here my solution to this problem without any package. In my particular case there is a chance that the same document is doubled, to avoid duplicates I use a set as intermediary.

static Stream<List<CleaningEvent>> streamForCleaner(
    String cleanerID,
    DateTime rawDate,
  ) {
    final date = DateTime.utc(rawDate.year, rawDate.month, rawDate.day);
    StreamController<List<CleaningEvent>> controller = StreamController();
    Iterable<CleaningEvent> eventsS1 = [], eventsS2 = [];
    Set<CleaningEvent> joinedSet = {};

    update() {
      joinedSet = {};
      joinedSet.addAll(eventsS1);
      joinedSet.addAll(eventsS2);
      controller.add(joinedSet.toList());
    }

    Stream<QuerySnapshot> stream1 = _collection
        .where("cleanersIDs", arrayContains: cleanerID)
        .where("reqDate", isEqualTo: date)
        .snapshots();
    Stream<QuerySnapshot> stream2 = _collection
        .where("cleanersIDs", arrayContains: cleanerID)
        .where("schDate", isEqualTo: date)
        .snapshots();
    stream1.forEach((query) {
      eventsS1 = query.docs.map(CleaningEvent.fromDoc);
      update();
    });
    stream2.forEach((query) {
      eventsS2 = query.docs.map(CleaningEvent.fromDoc);
      update();
    });

    return controller.stream;
  }

Upvotes: 0

Faz
Faz

Reputation: 359

I was also trying to combine two streams from firestore (as querying does not support OR) and went about it like this:

import 'package:rxdart/rxdart.dart';


Rx.combineLatest2(
   StreamQuerySnapshot1, //a snapshot from firestore
   StreamQuerySnapshot2, //another snapshot from firestore
   (var stream1, var stream2) {
    return [...stream1.docs, ...stream2.docs]; //Concatenated list
   }
)

This will emit changes no matter which streams is changing in contrast to other solutions I found which support emits only if both streams have changes.

Upvotes: 6

The best way I found is to use MergeStream from RxDart

Stream<QuerySnapshot> searchResult() {
  final falseRoomStream = FirebaseFirestore.instance
      .collection('room-list')
      .where('id', isEqualTo: 'false')
      .snapshots();

  final pendingRoomStream = FirebaseFirestore.instance
      .collection('room-list')
      .where('id', isEqualTo: 'pending')
      .snapshots();

  return MergeStream([falseRoomStream, pendingRoomStream]);
}

Upvotes: 0

Abhishek Kumar
Abhishek Kumar

Reputation: 21

Well I am late, but just gonna put it out there.

You can add whereIn clause in your query like this:

Firestore.instance.collection("collection_name").where("field",whereIn:["false","true"]).snapshots();

Upvotes: 2

Vishal
Vishal

Reputation: 283

I used RxDart package to combine two streams as shown below

RxDart - CombineLatestStream

final Stream<DocumentSnapshot> user = Firestore.instance
        .collection("users")
        .document(firebaseUser.uid)
        .snapshots();

    final Stream<QuerySnapshot> cards =
        Firestore.instance.collection("cards").snapshots();

    CombineLatestStream.list([user, cards]).listen((data) {
      add(LoadedHomeEvent(
        data.elementAt(0),
        data.elementAt(1),
      ));
    });

Upvotes: 3

m-bhole
m-bhole

Reputation: 1189

This should work.

//Change your streams here
    Stream<List<QuerySnapshot>> getData() {
        Stream stream1 = Firestore.instance.collection('list').where('id', isEqualTo: 'false').orderBy('timestamp').snapshots();
        Stream stream2 = Firestore.instance.collection('list').where('id', isEqualTo: 'true').orderBy('timestamp').snapshots();
        return StreamZip([stream1, stream2]);
      }


  @override
  Widget build(BuildContext context) {
    return new Scaffold(
      body: StreamBuilder(
          stream: getData(),
          builder: (BuildContext context, AsyncSnapshot<List<QuerySnapshot>> snapshot1) {

            List<QuerySnapshot> querySnapshotData =  snapshot1.data.toList();

            //copy document snapshots from second stream to first so querySnapshotData[0].documents will have all documents from both query snapshots
            querySnapshotData[0].documents.addAll(querySnapshotData[1].documents);

            if (querySnapshotData[0].documents.isEmpty)
              return Column(
                mainAxisAlignment: MainAxisAlignment.center,
                children: <Widget>[
                  Center(
                    child: CircularProgressIndicator(),
                  )
                ],
              );
            if (querySnapshotData[0].documents.length == 0)
              return const Center(
                child: Text(
                  "Not Available",
                  style: TextStyle(fontSize: 30.0, color: Colors.grey),
                ),
              );

            return new ListView(
                children: querySnapshotData[0].documents.map((DocumentSnapshot document){
                 // put your logic here. You will have access to document from both streams as "document" here
                  return new ListCard(document);
                }).toList()
            );
          }
      ),
    );
  }

Hope this helps!!!

Upvotes: 7

Jason Scott
Jason Scott

Reputation: 673

I’m not sure why you’re using forEach and Observable.just().

You can just merge two firestore streams directly like:

Observable.merge([stream1, stream2]).pipe(combineStream);

Wherre stream1/2 is just your firestore snapshot.

Upvotes: 2

Related Questions