David
David

Reputation: 266

Conditional RxJS Stream

resource1$ = hash1$.map( (renew: boolean) => renew ? http1$ : Observable.empty() );
resource2$ = hash2$.map( (renew: boolean) => renew ? http2$ : Observable.empty() );

sync$ = Observable.forkJoin(resource1$, resource2$);

sync$.subscribe( () => console.log('Sync done!), (err) => console.log('Sync failed!') );

Hello, I have multiple resources to sync from an API when my application start. I want to sync them in parallel and check if it's needed to sync them using a HEAD request before and compare X-HASH header with an old stored.

So hash1$ do a HEAD request, compare hashes and return true or false.

I'm stuck because, if resource1$ return Observable.empty, sync$ cancel all streams... And i don't understand why.

Upvotes: 0

Views: 509

Answers (1)

martin
martin

Reputation: 96891

The forkJoin requires all source Observables to emit at least one item and complete. If you use Observable.empty() you're sending just the complete notification so that's why forkJoin never emits.

You can do for example:

resource1$ = hash1$.map((renew: boolean) => renew ? http1$ : Observable.of(false));
resource2$ = hash2$.map((renew: boolean) => renew ? http2$ : Observable.of(false));

sync$ = Observable.forkJoin(resource1$, resource2$)
  .filter(results => results[0] && results[1]); // Or whatever condition you want

Upvotes: 2

Related Questions