keponk
keponk

Reputation: 320

Managing promises in RXJS observables

I've poked about SO and found many similar questions/answers but I may be missing something in my basic understanding on how to work with with this stack.

I'm working on a react native project along with RXJS/obervables. At some point I doing file downloads, this part is not a problem. A combo of pre-existing axios-rxjs and react-native-file-system get me where I want. The issue is I'm not sure how to handle it cleanly without async/await which I understand is an anti-pattern.

I want to transform this working code into a clean obervable-style flow.

I've implemented an Epic that does the operations I want as such:

const myDownloadEpic = (
  action$,
  state$
) =>
  action$.pipe(
    ofType(myDownloadActionType), // catches the relevant action
    map(action => action.payload),
    mergeMap(payload =>
      downloadManager       // a class with the relevant utils to get files
        .download(payload), // this axios call returns my Blob file as Observable<Blob> 
        .pipe(
          mergeMap(async response => {

            // a promise is returned by RNFS to read the contents of a folder
            const files = await RNFS.readDir(RELEVANT_TARGET_PATH) 
...
            // a promise returned from a custom function that converts my blob to bas64
            const data = await convertToBase64(response) 

            // another promise returned by RNFS to write to the file system
            await RNFS.writeFile(FULL_PATH, data, 'base64');

...
          })
        )
    )
   )

I've tried splitting this into several pipes, for example, I tried splitting the READ operation into a previous pipe but it ends up looking very verbose. Is there not a clean simple way to "hold" until the promises are done so I can make decisions based on their result?

What would be considered cleaner in this situation?

Upvotes: 1

Views: 1896

Answers (2)

Mrk Sef
Mrk Sef

Reputation: 8022

You can try something like this. It should be roughly equivalent to what you've written above.

const myDownloadEpic = (
  action$,
  state$
) => action$.pipe(
  ofType(myDownloadActionType),
  map(action => action.payload),

  mergeMap(payload => downloadManager.download(payload)),

  mergeMap(response => from(RNFS.readDir(RELEVANT_TARGET_PATH)).pipe(
    map(files => ({response, files}))
  )),

  mergeMap(values => from(convertToBase64(values.response)).pipe(
    map(data => ({...values, data}))
  )),

  mergeMap(({response, files, data}) => RNFS.writeFile(FULL_PATH, data, 'base64'))
);

Upvotes: 2

Joshua McCarthy
Joshua McCarthy

Reputation: 1852

The from() operator can convert a promise into an observable that will emit the promised value then completes.

If you need to wait until all promises are resolved, I recommend forkJoin() as it won't emit a value until all observables complete.

Lastly, to make the code a little cleaner, I would also recommend declaring separate variables/functions to define your observables for each promise.

const files$ = from(RNFS.readDir(RELEVANT_TARGET_PATH));
const getData = (response: unknown) => from(convertToBase64(response)).pipe(
  mergeMap(data=>
    from(RNFS.writeFile(FULL_PATH, data, 'base64')).pipe(
      mapTo(data)
    )
  )
);

const myDownloadEpic = (action$, state$) =>
  action$.pipe(
    ofType(myDownloadActionType),
    map(({ payload }) => payload),
    mergeMap(payload => downloadManager.download(payload)),
    mergeMap(response =>
      forkJoin({
        files: files$,
        data: getData(response)
      })
    ),
    map(({ files, data }) => {
      // check something with `files` and `data`
    })
  );

I'm assuming RNFS.writeFile() response is void, so I put it as an effect when subscribing to getData(). The mapTo() operator ignores any emitted value from the source observable, and returns whatever value you put in the parameter.

Upvotes: 2

Related Questions