aleung
aleung

Reputation: 10298

Why flatMap has no output when one stream has error?

I tried to write a program with highland.js to download several files, unzip them and parse into objects, then merge object streams into one stream by flatMap and print out.

function download(url) {
    return _(request(url))
        .through(zlib.createGunzip())
        .errors((err) => console.log('Error in gunzip', err))
        .through(toObjParser)
        .errors((err) => console.log('Error in OsmToObj', err));
}  

const urlList = ['url_1', 'url_2', 'url_3'];

_(urlList)
    .flatMap(download)
    .each(console.log);

When all URLs are valid, it works fine. If a URL is invalid there is no file downloaded, then gunzip reports error. I suspect that the stream closes when error occurs. I expect that flatMap will continue with other streams, however the program doesn't download other files and there is nothing printed out.

What's the correct way to handle error in stream and how to make flatMap not stop after one stream has error?

In imperative programming, I can add debug logs to trace where error happens. How to debug streaming code?

PS. toObjParser is a Node Transform Stream. It takes a readable stream of OSM XML and outputs a stream of objects compatible with Overpass OSM JSON. See https://www.npmjs.com/package/osm2obj

2017-12-19 update:

I tried to call push in errors as @amsross suggested. To verify if push really works, I pushed a XML document and it was parsed by following parser and I saw it from output. However, stream still stopped and url_3 was not downloaded.

function download(url) {
    console.log('download', url);
    return _(request(url))
        .through(zlib.createGunzip())
        .errors((err, push) => {
            console.log('Error in gunzip', err);
            push(null, Buffer.from(`<?xml version='1.0' encoding='UTF-8'?>
<osmChange version="0.6">
<delete>
<node id="1" version="2" timestamp="2008-10-15T10:06:55Z" uid="5553" user="foo" changeset="1" lat="30.2719406" lon="120.1663723"/>
</delete>
</osmChange>`));
        })
        .through(new OsmToObj())
        .errors((err) => console.log('Error in OsmToObj', err));
}

const urlList = ['url_1_correct', 'url_2_wrong', 'url_3_correct'];

_(urlList)
    .flatMap(download)
    .each(console.log);

Upvotes: 0

Views: 453

Answers (1)

amsross
amsross

Reputation: 453

Update 12/19/2017: Ok, so I can't give you a good why on this, but I can tell you that switching from consuming the streams resulting from download in sequence to merge'ing them together will probably give you the result you're after. Unfortunately (or not?), you will no longer be getting the results back in any prescribed order.

const request = require('request')
const zlib = require('zlib')
const h = require('highland')

// just so you can see there isn't some sort of race
const rnd = (min, max) => Math.floor((Math.random() * (max - min))) + min
const delay = ms => x => h(push => setTimeout(() => {
  push(null, x)
  push(null, h.nil)
}, ms))

const download = url => h(request(url))
  .flatMap(delay(rnd(0, 2000)))
  .through(zlib.createGunzip())

h(['urlh1hcorrect', 'urlh2hwrong', 'urlh3hcorrect'])
  .map(download).merge()
  // vs .flatMap(download) or .map(download).sequence()
  .errors(err => h.log(err))
  .each(h.log)

Update 12/03/2017: When an error is encountered on the stream, it ends that stream. To avoid this, you need to handle the error. You are currently using errors to report the error, but not handle it. You can do something like this to move on to the next value in the stream:

.errors((err, push) => {
  console.log(err)
  push(null) // push no error forward
})

Original: It's difficult to answer without knowing the input and output types of toObjParser are.

Because through passes a stream of values to the provided function and expects a stream of values in return, your issue may reside in toObjParser having a signature like Stream -> Object, or Stream -> Stream Object, where the errors are occurring on the inner stream, which will not emit any errors until it is consumed.

What is the output of .each(console.log)? If it is logging a stream, that is most likely your problem.

Upvotes: 0

Related Questions