codeepic
codeepic

Reputation: 4102

How to change stream interval dynamically when another stream emits

So I have a stream made of an array of strings.

const msgs = ['Searching...', 'Gathering the Data...', 'Loading the Dashboard...'];
const msgs$ = Observable.from(msgs);

I need to emit one of these messages sequentially every 3 seconds until dataFetched$ stream emits - which basically means all data arrived to page. I cannot cycle through them. Once the last message is seen but data hasn't arrived, it shouldn't change.

I can go over each string in msgs over time with RxJS zip function

const longInt$: Observable<number> = Observable.interval(3000);
const longMsgsOverTime$ = Observable.zip(msgs$, longInt$, msg => msg);

Then when the dataFetched$ stream emits I need to switch to shortInt$ which is

    const longInt$: Observable<number> = Observable.interval(1500);

And continue displaying the loading messages until the last one is visible. Each message should only be seen once.

It's quite mind bending for me - I can tick some of the requirements but not make it fully working. After many tries I arrived to conclusion that we need to wrap msgs array in a Subject to prevent cycling though all of them again after we switched from longInt$ to shortInt$.

===================EDIT===================== Following's ggradnig's answer and code I concocted this ugliness (for debugging purposes):

    setLoaderMsg(){
    console.log('%c setting loader msg', 'border: 10px dotted red;');
    const msgs$ = Observable.from(['Searching...', 'Gathering the Data...', 'Loading the Dashboard...', 'Something something...', 'yet another msg...', 'stop showing me...']),
        shortInt$ = Observable.interval(250).pipe(
            tap(v => console.log('%c short v', 'background: green;',v))
        ),
        longInt$ = Observable.interval(3000).pipe(
            tap(v => console.log('%c long v', 'background: red;',v)),
            takeUntil(this.dataFetched$)
        ),
        // interval$ = Observable.interval(3000).pipe(takeUntil(this.dataFetched$)).pipe(concat(Observable.interval(250))),
        interval$ = longInt$.pipe(concat(shortInt$));

    Observable.zip(msgs$, interval$, msg => msg)
    // Observable.zip(msgs$, interval$)
        .pipe(tap(v => {
            console.log('%c v', 'background: black; border: 1px solid red; color: white;', v);
            this.loaderMsg = v;
        }))
        .subscribe(
            () => {},                       //next
            () => {},                       //error
            // () => this.loading = false       //complete
            () => {
                console.log('%c complete', 'background: yellow; border: 2px solid red');
                // this.loading = false;
            }
        );
}

Below is a screengrab of my chrome console to see what happens. The shortInt$ stream would log green message, as you can see it never happens, even though the dataFetched$ stream emitted (orange dashed border). Below the "fetch data emit" log we should see green background messages that would signify that the shortInt$ stream is emitting values.

enter image description here

================================= 2nd EDIT ========================

Below is dataFetched$ observable:

fetchData(){
    console.log('%c fetching now', 'border: 2px dashed purple');

    // this.initLoader();

    this.dataFetched$ = Observable.combineLatest([
        this.heroTableService.getHeroTableData([SubAggs.allNetworks.key], AggFieldsMap.legends.name),
        this.researchService.benchmark(this.getSubAggsForOverviewTbl()),
        this.multiLineChartService.getEngagementOverTime(this.getSubAggsForNetworkEngagementsOverTimeTable())
    ]).pipe(
        share(),
        delay(7000),
        tap(v => {
            console.log('%c fetch data emit', 'border: 2px dashed orange', v);
        })
    );


    const sub = this.dataFetched$.subscribe(([heroTableRes, benchmarkRes, netByEngRes]: [ITable[], IBenchmarkResponse, any]) => {
         // this.loading = false; ==> moved to dashboard

         //xavtodo: split this shit into .do() or .tap under each http request call
         //hero table logic here
         this.heroTableData = heroTableRes;

         //////////////////////////////////////////////////////
         //engagement-by-network-table
         this.engagementByNetworkData = this.researchService.extractDataForEngagementByNetworkTable(benchmarkRes, this.getSubAggsForOverviewTbl());

         //////////////////////////////////////////////////////
         //network eng over time logic here MULTI LINE CHART
         this.engagementMultiLineData = this.multiLineChartService.extractEngagementData(netByEngRes, this.getSubAggsForNetworkEngagementsOverTimeTable());
         this.networks = this.multiLineChartService.getNetworks();
         this.multilineChartEngagements = Util.getNetworksWithAltNames(this.networks);
         this.publishedContentData = this.multiLineChartService.extractPublishedData(netByEngRes);

         //combined multiline chart
         this.multiLineChartData = {
             publishedData: this.publishedContentData,
             engagementData: this.engagementMultiLineData
         };
     });

    this.addSubscription(sub);
}

Upvotes: 4

Views: 577

Answers (1)

ggradnig
ggradnig

Reputation: 14169

Sounds like a task for takeUntil and concat.

The first observable is longInt$. It completes as soon as dataFetched$ emits. For this requirement, you can use takeUntil. It takes another observable as a parameter and once that observable emits, the source observable will complete.

From the docs:

Emits the values emitted by the source Observable until a notifier Observable emits a value.

So, the first step would be:

const longInt$: Observable<number> = Observable.interval(3000)
    .pipe(takeUntil(dataFetched$));

Next, you want to concat your other observable, shortInt$. Concat will let the first observable complete, then let the second observable take over.

From the docs:

Creates an output Observable which sequentially emits all values from given Observable and then moves on to the next.

This would look something like this:

const longMsgsOverTime$ = Observable.zip(msgs$, 
    longInt$.pipe(concat(shortInt$)), msg => msg);

Note: If you are using RxJS 5 or lower, replace the pipe(..) statements with the corresponding operator. For example, .pipe(concat(...)) turns into .concat(...)

Here is an example on StackBlitz: https://stackblitz.com/edit/angular-mtyfxd

Upvotes: 1

Related Questions