Reputation: 10298
I tried to write a program with highland.js
to download several files, unzip them and parse into objects, then merge object streams into one stream by flatMap
and print out.
function download(url) {
return _(request(url))
.through(zlib.createGunzip())
.errors((err) => console.log('Error in gunzip', err))
.through(toObjParser)
.errors((err) => console.log('Error in OsmToObj', err));
}
const urlList = ['url_1', 'url_2', 'url_3'];
_(urlList)
.flatMap(download)
.each(console.log);
When all URLs are valid, it works fine. If a URL is invalid there is no file downloaded, then gunzip reports error. I suspect that the stream closes when error occurs. I expect that flatMap
will continue with other streams, however the program doesn't download other files and there is nothing printed out.
What's the correct way to handle error in stream and how to make flatMap
not stop after one stream has error?
In imperative programming, I can add debug logs to trace where error happens. How to debug streaming code?
PS. toObjParser
is a Node Transform Stream. It takes a readable stream of OSM XML and outputs a stream of objects compatible with Overpass OSM JSON. See https://www.npmjs.com/package/osm2obj
2017-12-19 update:
I tried to call push
in errors
as @amsross suggested. To verify if push
really works, I pushed a XML document and it was parsed by following parser and I saw it from output. However, stream still stopped and url_3 was not downloaded.
function download(url) {
console.log('download', url);
return _(request(url))
.through(zlib.createGunzip())
.errors((err, push) => {
console.log('Error in gunzip', err);
push(null, Buffer.from(`<?xml version='1.0' encoding='UTF-8'?>
<osmChange version="0.6">
<delete>
<node id="1" version="2" timestamp="2008-10-15T10:06:55Z" uid="5553" user="foo" changeset="1" lat="30.2719406" lon="120.1663723"/>
</delete>
</osmChange>`));
})
.through(new OsmToObj())
.errors((err) => console.log('Error in OsmToObj', err));
}
const urlList = ['url_1_correct', 'url_2_wrong', 'url_3_correct'];
_(urlList)
.flatMap(download)
.each(console.log);
Upvotes: 0
Views: 453
Reputation: 453
Update 12/19/2017:
Ok, so I can't give you a good why on this, but I can tell you that switching from consuming the streams resulting from download
in sequence
to merge
'ing them together will probably give you the result you're after. Unfortunately (or not?), you will no longer be getting the results back in any prescribed order.
const request = require('request')
const zlib = require('zlib')
const h = require('highland')
// just so you can see there isn't some sort of race
const rnd = (min, max) => Math.floor((Math.random() * (max - min))) + min
const delay = ms => x => h(push => setTimeout(() => {
push(null, x)
push(null, h.nil)
}, ms))
const download = url => h(request(url))
.flatMap(delay(rnd(0, 2000)))
.through(zlib.createGunzip())
h(['urlh1hcorrect', 'urlh2hwrong', 'urlh3hcorrect'])
.map(download).merge()
// vs .flatMap(download) or .map(download).sequence()
.errors(err => h.log(err))
.each(h.log)
Update 12/03/2017:
When an error is encountered on the stream, it ends that stream. To avoid this, you need to handle the error. You are currently using errors
to report the error, but not handle it. You can do something like this to move on to the next value in the stream:
.errors((err, push) => {
console.log(err)
push(null) // push no error forward
})
Original:
It's difficult to answer without knowing the input and output types of toObjParser
are.
Because through
passes a stream of values to the provided function and expects a stream of values in return, your issue may reside in toObjParser
having a signature like Stream -> Object
, or Stream -> Stream Object
, where the errors are occurring on the inner stream, which will not emit any errors until it is consumed.
What is the output of .each(console.log)
? If it is logging a stream, that is most likely your problem.
Upvotes: 0