Doofus
Doofus

Reputation: 1102

Stream transformer from async generator and options

Nodejs streams support generating a transform stream from an async generator function. As the docs state:

async generators are effectively a first-class language-level stream construct at this point1

const tf = stream.Duplex.from(async function* (source) {
  for await (const chunk of source)
    yield chunk;
});

However, it appears to not support any options argument, in contrast to many other stream-construction methods. E.g. objectMode is apparently hardcoded to true2, but similar issues arise for e.g. highWaterMark. The docs state:

Attempting to switch an existing stream into object mode is not safe3

(nothing about switching it off)

Therefore the missing options parameter is confusing to me.

Perhaps there are reasons, why such streams should always be in object-mode, but i don't see them. Similar to e.g. readable streams, a transformer like the following makes perfect sense to me (xor operation for demo purposes, imagine e.g. deflate):

const tf = stream.Duplex.from(async function* (source) {
  for await (const chunk of source) {
    for (let i = 0; i < chunk.length; i++) chunk[i] ^= 0x7C;
    yield chunk;
  }
});

I couldn't find anything potentially difficult about adding an options parameter either. Therefore:

Is there a way to change options when creating streams this way? (if not, why?)

Upvotes: 0

Views: 2272

Answers (1)

jorgenkg
jorgenkg

Reputation: 4275

The execution path when using Duplex.from and Readable.from is hardcoded highWaterMark: 1 and objectMode: true in the node source code. The implementation for how Duplexes are created from various data sources is here.

Note that using .from() does not change that the stream's source is an (async) generator, and generators process one chunk of data at a time and run until yielding. Consequently, setting a highWaterMark > 1 would be like introducing an array in the generator that preemptively consumes the items to process next. This does not make sense. Instead, generators are expected to sequentially process items from an iterable, one at a time.

The same argument applies to objectMode: true. The generator sequentially processes iterable items rather than reading chunks from a byte buffer. The generator receives these items as inputs rather than fetching a chunk of bytes from a buffer.

Thus, for use-cases where the input is a continuous stream of bytes, one may instead use the lower-level APIs stream.Readable, stream.Writable and stream.Duplex.

Upvotes: 3

Related Questions