Reputation: 66599
I have a sourceStream
consisting of BaseData
objects.
I want to fork this stream into n
-amount of different streams, which then filter and transform each BaseData
object to their liking.
In the end, I want to have n
streams only containing a specific type and the forked streams can vary in their length, as data might be dropped or added in the future.
I thought I could set it up this via fork
:
import * as _ from 'highland';
interface BaseData {
id: string;
data: string;
}
const sourceStream = _([
{id: 'foo', data: 'poit'},
{id: 'foo', data: 'fnord'},
{id: 'bar', data: 'narf'}]);
const partners = [
'foo',
'bar',
];
partners.forEach((partner: string) => {
const partnerStream = sourceStream.fork();
partnerStream.filter((baseData: BaseData) => {
return baseData.id === partner;
});
partnerStream.each(console.log);
});
I expected to now have two streams, and the foo
-stream to contain two elements:
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
and the bar
-stream to contain one element:
{ id: 'bar', data: 'narf' }
Yet I get an error instead:
/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
throw new Error(
^
Error: Stream already being consumed, you must either fork() or observe()
at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
at Array.forEach (native)
at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
at Module._compile (module.js:570:32)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)
How to fork a stream into multiple streams?
I also tried chaining the calls, yet then I only get back one stream's result:
partners.forEach((partner: string) => {
console.log(partner);
const partnerStream = sourceStream
.fork()
.filter((item: BaseData) => {
return item.id === partner;
});
partnerStream.each((item: BaseData) => {
console.log(item);
});
});
Printing only:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
Instead of the expected:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}
It may also be the case that I misunderstood was fork
was all about. As per its doc entry:
Stream.fork() Forks a stream, allowing you to add additional consumers with shared back-pressure. A stream forked to multiple consumers will only pull values from its source as fast as the slowest consumer can handle them.
NOTE: Do not depend on a consistent execution order between the forks. This transform only guarantees that all forks will process a value foo before any will process a second value bar. It does not guarantee the order in which the forks process foo.
TIP: Be careful about modifying stream values within the forks (or using a library that does so). Since the same value will be passed to every fork, changes made in one fork will be visible in any fork that executes after it. Add to that the inconsistent execution order, and you can end up with subtle data corruption bugs. If you need to modify any values, you should make a copy and modify the copy instead.
Deprecation warning: It is currently possible to fork a stream after consuming it (e.g., via a transform). This will no longer be possible in the next major release. If you are going to fork a stream, always call fork on it.
So instead of "How to fork a stream?" my actual question might be: How to duplicate a highland stream on the fly into different streams?
Upvotes: 1
Views: 785
Reputation: 3939
partnerStream.filter()
returns a new stream. You're then consuming partnerStream
again using partnerStream.each()
, without calling fork()
or observe()
. So either chain the partnerStream.filter().each()
calls or assign the return value of partnerStream.filter()
to a variable and call .each()
on that.
Upvotes: 1
Reputation: 66599
One must bear in mind not to consume a forked stream before all forks are created. As if one consumes a forked stream, it and its "parent" will be consumed, making any subsequent fork being forked from an empty stream.
const partnerStreams: Array<Stream<BaseData>> = [];
partners.forEach((partner: string) => {
const partnerStream = sourceStream
.fork()
.filter((item: BaseData) => {
return item.id === partner;
}
);
partnerStreams.push(partnerStream);
});
partnerStreams.forEach((stream, index) => {
console.log(index, stream);
stream.toArray((foo) => {
console.log(index, foo);
});
});
It prints:
0 [ { id: 'foo', data: 'poit' }, { id: 'foo', data: 'fnord' } ]
1 [ { id: 'bar', data: 'narf' } ]
Upvotes: 0