Reputation: 381
I want to convert a stream of arbitrary but similarly keyed objects into a stream of CSV-strings. In order to do this, I want to peek at the first object in the stream and make a header-line of the object keys that will be prepended to the stream. I've gotten as far as this:
// Convert an object to a DSV
// NOTE: Assumes that all objects have the same properties!
export let toDSV = (delimiter=', ') => stream => {
let headerStream = stream.fork().head().flatMap(x => _.keys(x).collect());
return headerStream
.flatMap(headers => {
return _.sequence([
_([headers.join(delimiter)]),
stream.fork()
.pick(headers)
.map(_.values)
.flatMap(_.collect)
.invoke('join', [delimiter])
]);
});
};
For the following stream
_([{
foo: 1
bar: 2
baz: 3
}, {
foo: 2
bar: 1
baz: 4
}])
The resulting stream should be :
foo, bar, baz
1, 2, 3
2, 1, 4
Only the header-row is created though because of what I believe to be a back-pressure problem with fork()
where the first stream isn't fully consumed.
So, how can I "peek" at the first element of a stream, and then process the stream based on that information?
Upvotes: 2
Views: 257
Reputation: 453
It looks to me like your problem is that you're calling .fork()
on stream
at the outset: let headerStream = stream.fork()...
.
Now it's almost like you've got stream(a)
and stream(b)
, where return headerStream.flatMap...
is operating on stream(b)
and stream.fork().pick(headers)...
is waiting for you to start operating on stream(b)
.
Just nix both of those stream.fork()
calls and you should be set.
Upvotes: 0