oliver
oliver

Reputation: 9475

How can I populate a stream with a value from a future?

I'm using streams to monitor changes in my firestore backend. That works all right for updates, but I do not get the initial value.

class A {
  final Stream<DocumentSnapshot> _visitorCount =
      Firestore.instance.document('ServerData/serverStatus').snapshots();

  Stream<int> get visitorCount {
    return Observable(_visitorCount)
        .map((DocumentSnapshot doc) => doc['activeUsers'] as int);
  }
}

I could use rxdart's startWith to provide an initial value, but reading out this value results in a Future<int> and I'd need to supply an int to startWith.

static Future<int> f = Firestore.instance
  .collection('ServerData')
  .document('serverStatus')
  .get()
  .then((doc) => doc['activeUsers'] as int);

down the road I use it like this:

StreamBuilder(
          stream: _visitorCount,
          initialData: 0,
          builder: (context, snapshot) =>
              Text('${snapshot.data} users logged in') ...

I provide 0 as initial value but this is of course just a temporary value. I'm sure this is so obvious but still I struggle to get it right. What's the best way to solve this?

Upvotes: 2

Views: 3714

Answers (4)

Vittorio Paragallo
Vittorio Paragallo

Reputation: 1

At first a future can be converted to a stream with the asStream() method, as the future ends the stream will fire the event and close. Then you can use the possibility to concatenate the streams. So the main idea is to prepend your main stream with the future converted to stream.

    Stream<int> seedStream =  Future.value(1).asStream();
    Stream<int> mainStream=  Stream.fromIterable([2, 3, 4, 5]);
    Stream<int> populatedStream=ConcatStream([seedStream,mainStream]);

The Contactstream from Rx package will subscribe and emit only from seedStream till it ends. After seedStream ends ConcatStream subscrives to mainSrtream and keeps emitting from that.

There is also a solution without RxDart dependency by using yield:

 Stream<int> combinedStream() async* {
    // Step 1: Emit the value to preload
    yield await Future.value(1);
    // Step 2: Emit all the others from the main stream
    yield* Stream.fromIterable([2, 3, 4, 5])
  }

This solution is identic to ConcatStream (but not to ConcatStreamEager)

Upvotes: 0

oliver
oliver

Reputation: 9475

Turns out the problem I needed to solve did not require me to add some value from a future into the stream to make up for missed values. All I needed is to use a BehaviorSubject as pointed out by Remi Rousselet. But for completeness: here is how you take a value from a Future and push it into a stream:

Inserting a value from a Future into a stream

class A {
  Subject<int> _stateSubject;
  Subject<int> _state;
  Subject<int> get state {
    return _state;
  }

  A() {
    _stateSubject = new BehaviorSubject();
    _state = _stateSubject;

    Firestore.instance
        .document('ServerData/serverStatus')
        .snapshots()
        .map((DocumentSnapshot doc) => doc['activeUsers'] as int)
        .listen((int value) {
      _stateSubject.add(value);
    });
  }

  Future<int> get visitorCount => Firestore.instance
      .document('ServerData/serverStatus')
      .get()
      .then((DocumentSnapshot d) => d.data['activeUsers']);
  void addFromFuture() => visitorCount.then((int v) => _state.add(v + 1));
}

By calling addFromFuture I can query some value that will result in a Future<int>, take this value and push it into the Subject with the add() function. All the listeners to the _state-Subject will receive those updates.

Why is using a BehaviorSubject easier?

As an explanation why BehaviorSubject saves us from using this awkward workaround with Futures (taken from the RxJava docs):

Even though another subscriber came in late, the most recent received item is buffered and delivered correctly.

Upvotes: 5

Edman
Edman

Reputation: 5605

Firestore needs time to load the data before you can have something useful emitted to your StreamBuilder. The hasData property of the various snapshots can be used to react to this state at build time.

StreamBuilder(
  stream: _visitorCount,
  builder: (context, snapshot) {
    if (!snapshot.hasData) return CircularProgressIndicator(); // Loading..
    // Here the data is loaded and available
  },
)

Upvotes: 0

R&#233;mi Rousselet
R&#233;mi Rousselet

Reputation: 276997

Use a BehaviorSubject to provide initial value.

class A {
  Subject<String> _stateSubject;
  Stream<String> _state;
  Stream<String> get state {
    return _state;
  }

  A() {
    _stateSubject = new BehaviorSubject(seedValue: "initial value");
    _state = _stateSubject.asBroadcastStream();

    Stream fireStoreStream; // your firestore query here
    fireStoreStream.listen((value) {
      _stateSubject.add(value.toString()); // push your new value
    });
  }
}

Upvotes: 4

Related Questions