Magnus
Magnus

Reputation: 3722

Understanding merge in rxjs6 (and redux-observable specifically)

Context: I'm trying to use redux-observable with rxjs v6 to trigger multiple actions after fetching a resource.

Problem: I can trigger a single action after fetching the resource no problem; but my attempt to trigger multiple actions has me stumped. Specifically, my attempt shown below to use rxjs merge to trigger multiple actions seems so standard and close to documented examples (see e.g. here) that I can't understand where I'm going wrong. Here is a breakdown of my code with typescript types indicated in comments; I've simplified things for the sake of problem clarity:

import { from } from "rxjs"; 
import { merge, mergeMap, map } from "rxjs/operators";
import { AnyAction } from "redux";

... //Setup related to rxjs-observable epics

function myEpic(
    action$: Observable<AnyAction>,
    state$: any,
    {fetchPromisedStrings}: any
) {
    return action$.pipe(
        ofType('MY_ACTION'),
        mergeMap(
            action => 
{
    const demoAction1: AnyAction = {type:'FOO1', payload:'BAR1'};
    const demoAction2: AnyAction = {type:'FOO2', payload:'BAR2'};

    const w = from(fetchPromisedStrings())     //const w: Observable<string[]>
    const x = w.pipe( map(() => demoAction1)); //const x: Observable<AnyAction> 
    const y = w.pipe( map(() => demoAction2)); //const y: Observable<AnyAction> 

    //My attempt to merge observables:
    const z = merge(x,y); // const z: OperatorFunction<{}, {} | AnyAction>

    // return x; // Works :)
    // return y; // Works :)
    return z;    // Doesn't work :(
}   
        )
    );
}    

This code gives me the following error when I try to return z:

TypeError: You provided 'function (source) { return source.lift.call(_observable_merge__WEBPACK_IMPORTED_MODULE_0__["merge"].apply(void 0, [source].concat(observables))); }' where a stream was expected. You can provide an Observable, Promise, Array, or Iterable

So what's the problem here? I'd expect merge in the above code to take the two Observable<AnyAction> types and return a single Observable<AnyAction> type (or some more general type that's compatible with Observable<AnyAction>, which my epic needs in order to function correctly). Instead, I get some opaque type called OperatorFunction<{}, {} | AnyAction> that, evidently, isn't compatible with Observable<AnyAction>. Could someone please unmuddle my thinking here? Is merge not the operator I am supposed to use here, or is my entire rxjs-v6 pattern wrong for the (seemingly) simple goal of triggering multiple actions after fetching a promised resource?

Upvotes: 3

Views: 1862

Answers (1)

m1ch4ls
m1ch4ls

Reputation: 3435

Your code is ok, but you've imported merge operator instead of merge observable factory

Just do:

import { merge } from 'rxjs';

Upvotes: 14

Related Questions