Reputation: 4393
I want to quickly declare a transform stream without any additional libraries. Making async generators into transform streams via stream.Transform.from
looks like a good choice.
someReadable.pipe(
stream.Transform.from(async function* (source, writable) {
for await (const chunk of source) {
yield JSON.stringify(chunk, null, 2) + "\n\n";
}
})
)
Why does the above not work?
TypeScript throws:
Error:(8, 9) TS2345: Argument of type 'Readable' is not assignable to parameter of type 'WritableStream'.
Type 'Readable' is missing the following properties from type 'WritableStream': writable, write, end
Error:(8, 31) TS2345: Argument of type '(source: any, writable: any) => AsyncGenerator<string, void, unknown>' is not assignable to parameter of type 'Iterable<any> | AsyncIterable<any>'.
Property '[Symbol.asyncIterator]' is missing in type '(source: any, writable: any) => AsyncGenerator<string, void, unknown>' but required in type 'AsyncIterable<any>'.
Error:(14, 8) TS2339: Property 'pipe' does not exist on type 'WritableStream'.
Upvotes: 1
Views: 1562
Reputation: 11
With the new experimental stream.compose
API you can convert async iterables, generators and functions into streams. Example from docs:
import { compose } from 'stream';
import { finished } from 'stream/promises';
// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());
// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD'
Upvotes: 1
Reputation: 13662
The read
function is not actually from the Transform
class. Transform
is a child class of Duplex
which is a child of Readable
and Writable
. The only .from
function in the stream
module is the Readable.from
function, so you're actually calling that.
You can verify this yourself:
$ node
Welcome to Node.js v12.14.0.
Type ".help" for more information.
> const stream = require('stream')
> stream.Readable.from === stream.Transform.from
true
Unfortunately, the stream
module does not appear to have Transform.from
or Writable.from
.
From https://nodejs.org/api/stream.html#stream_types_of_streams:
Additionally, this module includes the utility functions
stream.pipeline()
,stream.finished()
andstream.Readable.from()
.
Upvotes: 2