phen0menon
phen0menon

Reputation: 2462

Node.js: Pipe stream with parsed data to another stream

I have a Readable that pipes into a WritableStream (https://github.com/fb55/htmlparser2?tab=readme-ov-file#usage-with-streams) from one library, which provides a SAX interface for parsing data. I want to pipe SAX stream into a new stream that takes parsed data and performs buffered inserts into a database (using async await).

Objective is following:

  1. Get file stream from S3
  2. Parse with Writable Stream from htmlparser2
  3. During parsing push chunks of data to Kafka

Here's what I managed to do without using pipes:

const content: Readable = getFileFromS3()

const chunkSize: number = 50
const handler = async (data) => ...

const writableKafkaStream = new KafkaWritableStream(chunksSize, handler)

const currentParsingObject = {}

const parser = new SaxWritableStream(
    {
        onopentag(name, attributes) {
            // ... construct data based on opened tag
            currentParsingObject.someProp = name
        },

        ontext(text) {
            // ... construct data based on text
            currentParsingObject.text = text
        },

        onclosetag(tagname) {
            // ... finish constructing data and write to a stream
            currentParsingObject.someProp2 = tagname

            // sending data to another stream
            writableKafkaStream.write(currentParsingObject)
        },
    },
    { xmlMode: true },
)

return new Promise(resolve => {
    content.pipe(parser).on('finish', () => {
        writableKafkaStream.end()
        resolve(count)
    })
})

But it looks like it overflows writableKafkaStream and does not pause the original stream while running writes in writableKafkaStream.

I am searching for a method to pipe the writableKafkaStream. As the SaxWritableStream solely triggers event callbacks (ontext, ontag and etc), I'm unsure of how to direct parsed data to another stream through piping.

EDIT:

I have added some pauses to the original stream when the write returns false:

onclosetag(tagname) {
    if (shouldAddToWritableKafkaStream) {
        if (!writableKafkaStream.write(data)) {
            content.pause()
            writableKafkaStream.once('drain', () => {
                content.resume()
            })
        }
        count += 1
    }
},

It seems to work (memory not consuming too much) but it shows lots of warnings:

(node:26490) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 drain listeners added to [KafkaTransporterTransformStream]. Use emitter.setMaxListeners() to increase limit

Upvotes: 0

Views: 112

Answers (1)

Heiko Theißen
Heiko Theißen

Reputation: 17417

You use the throttling mechanism "if write returns false then pause and resume on drain". But this throttles only the chunks being written to the SAX parser, and even one chunk can be so big that it leads to many writableKafkaStream.write operations, which may still overflow Kafka. This also explains the warnings about too many drain handlers that you observe.

I propose a different throttling mechanism which ensures that a write operation to Kafka finishes before the next one starts. This requires an asynchronous writeToKafka function which awaits the callback from the writableKafkaStream.write operation (assuming this callback happens only after Kafka has received the data):

function writeToKafka(currentParsingObject) {
  return new Promise(function(resolve, reject) {
    writableKafkaStream.write(currentParsingObject, function(err) {
      if (err) reject(err);
      else resolve();
    });
  });
}

Using this function, the synchronous onclosetag handler can maintain a queue of promises which makes the writing to Kafka asynchronous.

To also pause the original stream, pipe your content into a Writable stream that passes it on to the SAX parser only after the previous queue has completed:

var queue = Promise.resolve();
const parser = new SaxWritableStream({
  ...
  onclosetag(tagname) {
    // ... finish constructing data and write to a stream
    currentParsingObject.someProp2 = tagname
    // sending data to another stream
    queue = queue.then(
      writeToKafka.bind(undefined, currentParsingObject)
    );
  }
  ...
});
content.pipe(new stream.Writable({
  write(chunk, encoding, callback) {
    queue.then(function() {
      queue = Promise.resolve();
      parser.write(chunk);
      callback();
    });
  },
  finish(callback) {
    queue.then(function() {
      parser.end();
      callback();
    });
  }
}));

Upvotes: 1

Related Questions