Reputation: 157
I want to do something like this
const { Readable } = require("stream");
function generatorToStream(generator) {
return new Readable({
read() {
(async () => {
for await (const result of generator()) {
if (result.done) {
this.push(null);
} else {
this.push(result.value);
}
}
})();
}
});
}
generatorToStream(async function*() {
const msg1 = await new Promise(resolve =>
setTimeout(() => resolve("ola amigao"), 2000)
);
yield msg1;
const msg2 = await new Promise(resolve =>
setTimeout(() => resolve("ola amigao"), 2000)
);
yield msg2;
const msg3 = await new Promise(resolve =>
setTimeout(() => resolve("ola amigao"), 2000)
);
yield msg3;
}).pipe(process.stdout);
but it's not working, the end event has never been called and i haven't received any data on my terminal.
Any solution or tips on how to implement it?
Upvotes: 5
Views: 2842
Reputation: 191
Very basic working sample below:
import { Readable } from 'stream';
const generator = async function* () {
yield 1;
yield 2;
yield 3;
};
const myReadableStream = Readable.from(generator());
myReadableStream.on('data', (data) => console.log(data));
Upvotes: 5
Reputation: 2658
I'm the author of Scramjet, a functional stream processing framework that may be an easy solution for you.
If you're ok with adding a total of just 3 dependencies to your project then it couldn't be easier:
const {StringStream} = require("scramjet");
StringStream.from(async function* () {
yield await something();
...
});
If you want to implement this on your own, take a look at the source code in DataStream line 112 - it should be fairly easy to implement. In general you'd need to implement something like this:
function generatorToStream(gen) {
// obtain an iterator
const iter = await gen();
// create your output
const out = new Passthrough();
// this IIFE will do all the work
(async () => {
let done = false;
for await (let chunk of iter) {
// if write returns true, continue, otherwise wait until out is drained.
if (!out.write(chunk)) await new Promise((res, rej) => this.once("drain", res);
}
})()
// handle errors by pushing them to the stream for example
.catch(e => out.emit('error', e));
// return the output stream
return out;
}
The above example is more or less what's happening in scramjet - there's a little more optimization there on keeping less event handlers and so on, but the above should work well in a simple case.
Upvotes: 2