noone000
noone000

Reputation: 157

How to implement an async generator to stream with node Readable stream?

I want to do something like this

const { Readable } = require("stream");

function generatorToStream(generator) {
  return new Readable({
    read() {
      (async () => {
        for await (const result of generator()) {
          if (result.done) {
            this.push(null);
          } else {
            this.push(result.value);
          }
        }
      })();
    }
  });
}

generatorToStream(async function*() {
  const msg1 = await new Promise(resolve =>
    setTimeout(() => resolve("ola amigao"), 2000)
  );
  yield msg1;
  const msg2 = await new Promise(resolve =>
    setTimeout(() => resolve("ola amigao"), 2000)
  );
  yield msg2;

  const msg3 = await new Promise(resolve =>
    setTimeout(() => resolve("ola amigao"), 2000)
  );
  yield msg3;
}).pipe(process.stdout);

but it's not working, the end event has never been called and i haven't received any data on my terminal.

Any solution or tips on how to implement it?

Upvotes: 5

Views: 2842

Answers (2)

raisedbywolves
raisedbywolves

Reputation: 191

Very basic working sample below:

import { Readable } from 'stream';

const generator = async function* () {
  yield 1;
  yield 2;
  yield 3;
};

const myReadableStream = Readable.from(generator());
myReadableStream.on('data', (data) => console.log(data));

Upvotes: 5

Michał Karpacki
Michał Karpacki

Reputation: 2658

I'm the author of Scramjet, a functional stream processing framework that may be an easy solution for you.

If you're ok with adding a total of just 3 dependencies to your project then it couldn't be easier:

const {StringStream} = require("scramjet");

StringStream.from(async function* () {
    yield await something();
    ...
});

If you want to implement this on your own, take a look at the source code in DataStream line 112 - it should be fairly easy to implement. In general you'd need to implement something like this:

function generatorToStream(gen) {
    // obtain an iterator
    const iter = await gen();
    // create your output
    const out = new Passthrough();

    // this IIFE will do all the work
    (async () => {
        let done = false;
        for await (let chunk of iter) {
            // if write returns true, continue, otherwise wait until out is drained.
            if (!out.write(chunk)) await new Promise((res, rej) => this.once("drain", res);
        }
    })()
        // handle errors by pushing them to the stream for example
        .catch(e => out.emit('error', e));

    // return the output stream
    return out;
}

The above example is more or less what's happening in scramjet - there's a little more optimization there on keeping less event handlers and so on, but the above should work well in a simple case.

Upvotes: 2

Related Questions