Reputation: 2462
I have a Readable
that pipes into a WritableStream
(https://github.com/fb55/htmlparser2?tab=readme-ov-file#usage-with-streams) from one library, which provides a SAX interface for parsing data. I want to pipe SAX stream into a new stream that takes parsed data and performs buffered inserts into a database (using async await).
Objective is following:
Here's what I managed to do without using pipes:
const content: Readable = getFileFromS3()
const chunkSize: number = 50
const handler = async (data) => ...
const writableKafkaStream = new KafkaWritableStream(chunksSize, handler)
const currentParsingObject = {}
const parser = new SaxWritableStream(
{
onopentag(name, attributes) {
// ... construct data based on opened tag
currentParsingObject.someProp = name
},
ontext(text) {
// ... construct data based on text
currentParsingObject.text = text
},
onclosetag(tagname) {
// ... finish constructing data and write to a stream
currentParsingObject.someProp2 = tagname
// sending data to another stream
writableKafkaStream.write(currentParsingObject)
},
},
{ xmlMode: true },
)
return new Promise(resolve => {
content.pipe(parser).on('finish', () => {
writableKafkaStream.end()
resolve(count)
})
})
But it looks like it overflows writableKafkaStream
and does not pause the original stream while running writes in writableKafkaStream
.
I am searching for a method to pipe the writableKafkaStream
. As the SaxWritableStream
solely triggers event callbacks (ontext, ontag and etc), I'm unsure of how to direct parsed data to another stream through piping.
EDIT:
I have added some pauses to the original stream when the write
returns false:
onclosetag(tagname) {
if (shouldAddToWritableKafkaStream) {
if (!writableKafkaStream.write(data)) {
content.pause()
writableKafkaStream.once('drain', () => {
content.resume()
})
}
count += 1
}
},
It seems to work (memory not consuming too much) but it shows lots of warnings:
(node:26490) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 drain listeners added to [KafkaTransporterTransformStream]. Use emitter.setMaxListeners() to increase limit
Upvotes: 0
Views: 112
Reputation: 17417
You use the throttling mechanism "if write
returns false then pause
and resume
on drain
". But this throttles only the chunks being written to the SAX parser
, and even one chunk can be so big that it leads to many writableKafkaStream.write
operations, which may still overflow Kafka. This also explains the warnings about too many drain
handlers that you observe.
I propose a different throttling mechanism which ensures that a write operation to Kafka finishes before the next one starts. This requires an asynchronous writeToKafka
function which awaits the callback from the writableKafkaStream.write
operation (assuming this callback happens only after Kafka has received the data):
function writeToKafka(currentParsingObject) {
return new Promise(function(resolve, reject) {
writableKafkaStream.write(currentParsingObject, function(err) {
if (err) reject(err);
else resolve();
});
});
}
Using this function, the synchronous onclosetag
handler can maintain a queue
of promises which makes the writing to Kafka asynchronous.
To also pause the original stream, pipe your content
into a Writable
stream that passes it on to the SAX parser
only after the previous queue
has completed:
var queue = Promise.resolve();
const parser = new SaxWritableStream({
...
onclosetag(tagname) {
// ... finish constructing data and write to a stream
currentParsingObject.someProp2 = tagname
// sending data to another stream
queue = queue.then(
writeToKafka.bind(undefined, currentParsingObject)
);
}
...
});
content.pipe(new stream.Writable({
write(chunk, encoding, callback) {
queue.then(function() {
queue = Promise.resolve();
parser.write(chunk);
callback();
});
},
finish(callback) {
queue.then(function() {
parser.end();
callback();
});
}
}));
Upvotes: 1