Daniel
Daniel

Reputation: 381

Peek at first object of stream with highland.js

I want to convert a stream of arbitrary but similarly keyed objects into a stream of CSV-strings. In order to do this, I want to peek at the first object in the stream and make a header-line of the object keys that will be prepended to the stream. I've gotten as far as this:

// Convert an object to a DSV
// NOTE: Assumes that all objects have the same properties!
export let toDSV = (delimiter=', ') => stream => {
  let headerStream = stream.fork().head().flatMap(x => _.keys(x).collect());

  return headerStream
    .flatMap(headers => {
      return _.sequence([
        _([headers.join(delimiter)]),
        stream.fork()
          .pick(headers)
          .map(_.values)
          .flatMap(_.collect)
          .invoke('join', [delimiter])
      ]);
    });
};

For the following stream

_([{
  foo: 1
  bar: 2
  baz: 3
}, {
  foo: 2
  bar: 1
  baz: 4
}])

The resulting stream should be :

foo, bar, baz
1, 2, 3
2, 1, 4

Only the header-row is created though because of what I believe to be a back-pressure problem with fork() where the first stream isn't fully consumed.

So, how can I "peek" at the first element of a stream, and then process the stream based on that information?

Upvotes: 2

Views: 257

Answers (1)

amsross
amsross

Reputation: 453

It looks to me like your problem is that you're calling .fork() on stream at the outset: let headerStream = stream.fork()....

Now it's almost like you've got stream(a) and stream(b), where return headerStream.flatMap... is operating on stream(b) and stream.fork().pick(headers)... is waiting for you to start operating on stream(b).

Just nix both of those stream.fork() calls and you should be set.

Upvotes: 0

Related Questions