James T.
James T.

Reputation: 978

What's the best way to create a RxJS Observable out of objects that are yielded from a node.js stream pipeline?

I was able to implement this, but I'm not able to explain why the process will exit with code 0 if I don't have another async function (processAfterDeclaration) always trying to pull from the Observable, recordObservable.

setup to run the source file

npm init -y
npm i [email protected] byline

source file that does what I want, but in a confusing way

// node.js 14
const fs = require('fs');
const pipeline = require('util').promisify(require('stream').pipeline);

const byline = require('byline');
const { Observable } = require('rxjs');
const { take } = require('rxjs/operators');

const sleep = ms => new Promise(r => setTimeout(r, ms));
let recordObservable;

(async () => {
  const inputFilePath = 'temp.csv';

  try {
    const data = 'a,b,c\n' +
      '1,2,3\n' +
      '10,20,30\n' +
      '100,200,300';

    fs.writeFileSync(inputFilePath, data);

    console.log('starting pipeline');
    // remove this line, and the `await pipeline` resolves, but process exits early?
    processAfterDeclaration().catch(console.error);
    
    await pipeline(
      fs.createReadStream(inputFilePath),
      byline.createStream(),
      async function* (sourceStream) {
        console.log('making observable', inputFilePath);

        recordObservable = new Observable(async subscriber => {
          for await (const lineBuffer of sourceStream) {
            subscriber.next(lineBuffer.toString());
          }
          subscriber.complete();
        });

        console.log('made observable', recordObservable);
      }
    );

    console.log('pipeline done', recordObservable);
  } catch (error) {
    console.error(error);
  } finally {
    fs.unlinkSync(inputFilePath);
  }
})();

async function processAfterDeclaration() {
  while (!recordObservable) {
    await sleep(100);
  }

  console.log('can process');
  await recordObservable
    .pipe(take(2))
    .subscribe(console.log)
}

edit: It may be better to just forgo node.js stream.pipeline. I'd think using pipeline is best bc it should be the most efficient and offers backpressuring, but I want to test some things offered by RxJS.

edit2: More reasons to be able to forgo stream.pipeline is that I can still use pipe methods and provide any readable stream to the from function as an arg. I can then use subscribe method to write/append each thing from the observable to my output stream then call add on my subscription to add teardown logic, specifically for closing my write stream. I would hope that RxJS from would help determine when to close the read stream it's given as input. Finally, I would recommend await lastValueFrom(myObservable) or firstValueFrom possibly.

Upvotes: 0

Views: 1197

Answers (2)

Mrk Sef
Mrk Sef

Reputation: 8022

RxJS from operator

The RxJS from operator will turn an async iterator (like node stream) into an observable for you!

I can't run/test on your code, but something in this ballpark should work.

const fs = require('fs');

const byline = require('byline');
const { from } = require('rxjs');
const { map, take, finalize} = require('rxjs/operators');

const inputFilePath = 'temp.csv';

(async () => {
  const data = 'a,b,c\n' +
    '1,2,3\n' +
    '10,20,30\n' +
    '100,200,300';

  fs.writeFileSync(inputFilePath, data);

  console.log('starting pipeline');

  from(byline(fs.createReadStream(inputFilePath)))
    .pipe(
      map(lineBuffer => lineBuffer.toString()),
      take(2),
      finalize(() => fs.unlinkSync(inputFilePath))
    )
    .subscribe(console.log);
})();

Your second async function

I'm not able to explain why the process will exit with code 0 if I don't have another async function (processAfterDeclaration) always trying to pull from the Observable

If you define a function and never call it, that function will never calculate anything.

If you define an observable and never subscribe to it, that observable will never do anything either. That's different from promises which start the moment they're defined. You just need to subscribe to that observable, it doesn't need a separate function though.

This should work the same:

recordObservable = new Observable(async subscriber => {
  for await (const lineBuffer of sourceStream) {
    subscriber.next(lineBuffer.toString());
  }
  subscriber.complete();
});

recordObservable.pipe(
  take(2)
).subscribe(console.log)

Upvotes: 1

James T.
James T.

Reputation: 978

The second async function

I'm not able to explain why the process will exit with code 0 if I don't have another async function (processAfterDeclaration) always trying to pull from the Observable

The logic error was that the await pipeline would never resolve or reject because the 3rd step in the pipeline would never yield anything because nothing would ever subscribe and pull from the recordObservable. It was a deadlock that was written accidentally.

Upvotes: 0

Related Questions