f.khantsis
f.khantsis

Reputation: 3560

combineLatest doesn't subscribe to parameter Observables, doesn't deliver data

I have a fairly common use case. I am loading an array of objects from FirebaseDatabase, and attaching images from Firebase Storage.

FirebaseDatabase returns an Observable based on query. Everytime data in the db will change, the Observable will emit an updated query result. I also attach a Promise from the Storage api to each entry in the result by mapping to a [Domain, ImagePromise] tuple for every entry

getDomainObj(){
   return this.af.database.list(this.listRef)
    .map((mps: Domain[]) => 
       mps.map((mp: Domain) =>  
         [mp, this.getImage(mp['$key'])]));
}

Two issues:

  1. some mps don't have an image.
  2. Since images are not in the firedb, the observable will not emit on image change. To fix that, I added a Cloud function to send a Push notification on every image update (outside the scope of this question, it works, so no need to go into it). Each mp is its own PubSub topic, and we in the client subscribe to the topic, and get an Observable of updates that will emit every time an image for a specific mp is uploaded

I solve them both by mapping

this.mpSvc.getDomainObj()
  .map((tupleArray: [Domain, Promise<string>][]) => 
    tupleArray.map(([mp, imageSrcPromise]: [Domain, Promise<string>]) => {
      return [mp, 
           Observable.fromPromise(imageSrcPromise) // get initial image
            .startWith(null) // make sure that it emits at least once
            .concat( // updates
              this.mpSvc.signUpForPictureUpdates(mp['$key'])
              .map(([id, imageSrc]: [string, string]) => imageSrc))

      ];
    })
)

mpSvc.signUpForPictureUpdates returns an Observable of domainId-imageSrc tuples that will emit every time the image for domain will get reuploaded

Now the ultimate goal is to have an Observable of an array of [Domain, imageSrc], which will emit every time an image of any one of the Domain object changes, or any of the Domain object changes, or the result of the query changes.

First I remap again, instead of emitting an array of [Domain,Observable] pairs, emit an array of Observable<[Domain, imageSrc]>

.map((tupleArray: [Domain, Observable<string>][]) => 
  tupleArray.map(([mp, imageSrcObservable]: [Domain, Observable<string>]) => 
     imageSrcObservable.map((imageSrc: string) => [mp, imageSrc])))

now I have an Observable of an array of Observables of pairs. The parent Observable will emit every time a Domain or query result will change, and children will emit every time an image for a specific domain will change.

For final step, I combine the children observable into a single observable and switchMap to it.

.switchMap((imageObservables: Observable<[Domain, string]>[]) => 
  Observable.combineLatest(imageObservables)
)

the result is properly subsribed to (in fact, it is displayed in Angular template via ngFor|async, but that is also outside of scope).

Unfortunately, the results are not what expected. In reality, I have four result objects from the query, two with images, and two without. What I see is not consistent -- sometimes both images load, sometimes one, and most often neither.

Specifically, if I were to add a logging line to the end like so:

.do(x => console.log('final', x), x => console.error('finalerror', x), () => console.log('finalcomplete'));

I always get at least one log line with nulls in the imageSrc, but barely ever additional lines with resolved real imageSrcs.

subscribing before the last step (combineLatest) gets all the data properly

What am I doing wrong?

Edit: after looking into this more, the problem is definately with combineLatest. I tried replacing combineLatest with imageObservables[1].map(x=>[x]) and it works perfectly (although of course only returns one value instead of array. Also, stepping through combineLatest led me to some closing which is weird, since what would need to be closed at any point?

Upvotes: 0

Views: 1163

Answers (1)

f.khantsis
f.khantsis

Reputation: 3560

Ok, I found the problem. fromPromise will throw an error if the Promise rejects. Thus, one of the children error'ed out when it getImage didn't get an image , which cause the combineLatest to complete.

I fixed it like this

Observable.fromPromise(imageSrcPromise) // get initial image
.catch(() => Observable.empty()) // <-- fix 
.startWith(null) // and so forth...

This is why sometimes it emitted properly, and sometimes it didn't. If the requests for Domain objects with images loaded first, they would display. If the image-less objects returned first, the stream would end.

Upvotes: 2

Related Questions