peterc
peterc

Reputation: 7843

How to use an async calling inside of an RxJs Observable constructor

I am setting up an effect in an application using NgRx.

During the effect, I need to get some store values, do some logic, and then dispatch one or more actions as a result of the logic I run.

So, to make is easier, I am using an effect with dispatch: false.

public getCachedOrServerValue$ = createEffect(() =>
    this.actions$.pipe(
      ofType(myActions.getCachedOrServerValue),
      switchMap((_) => this.getCachedOrServerValue())
    ), { dispatch: false }
);

So in the above I will call getCachedOrServerValue. I want to use switchMap (rather than just tap) so if I get a new myActions.getCachedOrServerValue before the previous has finished, it will cancel the previous subscription (from what I understand about switchMap).

The function that we call looks like the following:

private getCachedOrServerValue(): Observable<void> {
  const result$ = new Observable<void>(async (subscriber) => { // <---- I need this to be async so I can use await
    // First see if we can get it from the store
    const cached = await this.storeUtils.getStoreValue(fromApp.getValue);
    if (cached) {
      this.store$.dispatch(myActions.getCachedOrServerValueSuccess(cached));
      subscriber.next();
      return;
    }

    // Need to request from the server
    const name = await this.storeUtils.getStoreValue(fromApp.getSiteName);

    // storeUtils.getStoreValue is just a helper we have to get a single value as a Promise
    const valueId = await this.storeUtils.getStoreValue(fromApp.getValueId);
    const serverValue = await this.myService.getValue(namevalueId);
    this.store$.dispatch(myActions.storeValue(serverValue));
    this.store$.dispatch(myActions.getCachedOrServerValueSuccess(cached));
  });
  return result$;
}

First, I wrap in an observable, as from what I can remember we need an observable to use this call within switchMap.

Second (my actual problem), I want to be able to use await inside the Observable subscriber constructor argument, but VSCode is reporting an error.

enter image description here

Error details:

Argument of type '(this: Observable, subscriber: Subscriber) => Promise' is not assignable to parameter of type '(this: Observable, subscriber: Subscriber) => TeardownLogic'. Type 'Promise' is not assignable to type 'TeardownLogic'. Property 'unsubscribe' is missing in type 'Promise' but required in type 'Unsubscribable'.ts(2345)

types.d.ts(31, 5): 'unsubscribe' is declared here.

What is the problem here, and how I can use the aysnc/await as shown above?

Update

Going from @StPaulis answer, I can avoid having to do this (at least in this situation).

I can do it like the following (not tested yet, but am sure will be fine):

public getCachedOrServerValue$ = createEffect(() =>
  this.actions$.pipe(
    ofType(myActions.getCachedOrServerValue),
    switchMap(_ => from(this.getCachedOrServerValue()).pipe( // <-- use a from here 
         map(data => myActions.getCachedOrServerValueSuccess(data)),
         catchError(error => of(myActions.getCachedOrServerValueFail(error)))
))));

private async getCachedOrServerValue(): Promise<DataState>{
 // First see if we can get it from the store
 const cached = await this.storeUtils.getStoreValue(fromApp.getValue);
 if (cached) {
    return cached;
 }

 // Need to request from the server
 const name = await this.storeUtils.getStoreValue(fromApp.getName);
 const valueId = await this.storeUtils.getStoreValue(fromApp.getValueId);
 const serverValue = await this.myService.getValue(name, valueId);
 return serverValue;
}

Upvotes: 6

Views: 1247

Answers (1)

StPaulis
StPaulis

Reputation: 2916

In my opinion, mixing observables with promises is never a good idea.

I suggest you to use RxJs | from to convert your promise to an observable and use it inside your pipe instead of creating a new observable and wrapping all the logic inside.

Upvotes: 1

Related Questions