sampathsris
sampathsris

Reputation: 22270

How to use an observable within `filter`?

Suppose I have a directory structure similar to following:

foo
|
+---one
|   +---tmp
|
+---two
|
+---three
    |
    +---tmp

I want to get the list of sub-directories under foo which has a tmp (sub-)sub-directory underneath. I do have the following code which does that:

const { bindNodeCallback, from } = require('rxjs');
const { map, flatMap, filter } = require('rxjs/operators');
const { readdir, access, accessSync, constants: { F_OK, R_OK }} = require('fs');
const { join } = require('path');

const readdirRx = bindNodeCallback(readdir);

const FOO = './foo';

const result = readdirRx(FOO)
    .pipe(
        // readdir will return an array of files. Convert each
        // file into an observable, and then flatten it.
        flatMap(from),

        // get rid of directories that do not have a `tmp` dir underneath
        filter(dir => {
            try {
                accessSync(join(FOO, dir, 'tmp'), F_OK | R_OK);
                return true;
            } catch (err) {
                return false;
            }
        })
    );
result.subscribe(console.log, console.error, () => console.log('Done!'));
// outputs (correctly):
//   one
//   three
//   Done!

However, this code looks awful to me because (a) I am using exceptions for control flow, and (b) there is a synchronous accessSync call, both of which are detrimental to performance.

I do have the following reactive piece of code that checks the same thing:

const fileExistsAndReadableRx = bindNodeCallback(
    // check if file exists and readable
    (file, cb) => access(file, F_OK | R_OK,
        // call the callback with true if no errors found
        err => cb(!err)
    )
);

However, I cannot figure out how I can plug fileExistsAndReadableRx into the above program. My objective is to remove the try-catch block and use fileExistsAndReadableRx instead to filter out sub-directories that do not have a tmp (sub-)sub-directory. How can I do that?

(Please note that the actual task I am trying to do is not reading the disk. What I am trying to do is a complex async operation, and I had to come up with a simpler example to illustrate my problem).

What I have tried so far:

I tried to use map, but it emits a stream of Observables, as one would expect:

const result = readdirRx(FOO)
    .pipe(
        flatMap(from),
        map(dir =>
            fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
        )
    );
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
//   Observable { _isScalar: false, _subscribe: [Function] }
//   Observable { _isScalar: false, _subscribe: [Function] }
//   Observable { _isScalar: false, _subscribe: [Function] }
//   Done!

So I thought, I know! I'd use flatMap to flatten the Observables. That should produce three boolean values that are eventually emitted from those Observables. But that did not work either. It only emits a single value:

const result = readdirRx(FOO)
    .pipe(
        flatMap(from),
        flatMap(dir =>
            fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
        )
    );
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
//   true
//   Done!

Edit:

I tried @IngoBurk's answer, but that produces a single boolean value. Not a list of strings as I would expect:

const result = readdirRx(FOO)
    .pipe(
        flatMap(from),
        flatMap(dir => fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
            .pipe(
                filter(Boolean),
                map(() => dir),
            )
        ),
    );
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
//   true
//   Done!

Upvotes: 2

Views: 167

Answers (1)

Ingo Bürk
Ingo Bürk

Reputation: 20033

You can do something like this:

readdirRx(FOO)
  .pipe(
    flatMap(from),
    flatMap(dir => fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
      .pipe(
        catchError(res => of(res)),
        filter(Boolean),
        map(() => dir),
      )
    ),
  )

It assumes that fileExistsAndReadableRx returns Observable<boolean>. filter(Boolean) is just short for filter(v => !!v).

The trick here is essentially what you tried, which is (flat-)mapping each directory to the observable checking for the tmp folder. We then use that result to filter out those not matching it and then map it back to the directory rather than that intermediate result.

You can see it in action here: https://rxviz.com/v/d8djbkRO

you'd probably want to use cb(null, !err) to invoke the callback in fileExistsAndReadableRx

Upvotes: 2

Related Questions