Reputation: 66599
When using highlandjs to create a stream, I know I can consume it like so:
import * as high from "highland";
console.log("Single stream:");
const stream0 = high([1, 2, 3, 4, 5]);
stream0.each(console.log); // each number will be printed
Now I want to fork the stream into two, do something on both of them, and then merge two two forks back together into one stream. (Or to phrase it differently: I want to partition the stream into two, yet the result of their actions should be the same response stream.)
Yet I am unable to make it work. The example is highly contrived but showcases my problem:
import * as high from "highland";
const stream = high([1, 2, 3, 4, 5]);
const fork1 = stream.fork().filter(i => i % 2 === 0);
const fork2 = stream.fork().filter(i => i % 2 !== 0);
const result = fork1.concat(fork2);
result.each(console.log); // doesn't do anything
Then result stream is paused. I think it has to to with fork only starting after all the forks have started processing the input stream, yet I am unsure how to start it.
Upvotes: 0
Views: 125
Reputation: 453
I would use merge
for this:
const high = require('highland')
const stream = high([1, 2, 3, 4, 5])
const fork1 = stream.fork().filter(i => i % 2 === 0)
const fork2 = stream.fork().filter(i => i % 2 !== 0)
high([fork1, fork2]).merge().each(console.log) // 1 2 3 4 5
Upvotes: 1
Reputation: 66599
I managed to get it working by resuming the source-stream after defining the forks and concatination:
const stream = high([1, 2, 3, 4, 5]);
const fork1 = stream.fork().filter(i => i % 2 === 0);
const fork2 = stream.fork().filter(i => i % 2 !== 0);
const result = fork1.concat(fork2);
stream.resume(); // starts the source stream!
result.each(console.log); // doesn't do anything
Now it will print each value (out of order, yet that is to be expected):
2
4
1
3
5
Upvotes: 0