Reputation: 3425
I am implementing one tokenizer. It parses the document, tokenizes it on a set of possible delimiters and then provides me combination of 1-, 2- and 3-word tokens. I was able to achieve my goal, but only in one specific way:
Stream<String> contentStr = file.openRead().transform(utf8.decoder);
Stream<String> tokens = contentStr.transform(charSplitter).transform(tokenizer).asBroadcastStream();
var twoWordTokens = tokens.transform(sliding(2));
var threeWordTokens = tokens.transform(sliding(3));
StreamController<String> merger = StreamController();
tokens.forEach((token) => merger.add(token));
threeWordTokens.forEach((token) => merger.add(token));
twoWordTokens.forEach((token) => merger.add(token));
merger.stream.forEach(print);
As you can see I do following:
StreamConsumer
(StreamController
to be precise) and pump every event from 3 streams to that stream consumer.It works but I don't like that I add each element from source streams via StreamConsumer.add
method. I wanted to use StreamController.addStream
instead but that somehow does not work.
The following code gives me a Bad state: Cannot add event while adding a stream
error and I understand why:
StreamController<String> merger = StreamController();
merger.addStream(tokens);
merger.addStream(twoWordTokens);
merger.addStream(threeWordTokens);
merger.stream.forEach(print);
This is per API documentation of the StreamController.addStream
.
So I need to wait for each addStream
returning future completion:
StreamController<String> merger = StreamController();
await merger.addStream(tokens);
await merger.addStream(twoWordTokens);
await merger.addStream(threeWordTokens);
await merger.stream.forEach(print);
But in this case I get nothing printed in the console.
If I do this:
StreamController<String> merger = StreamController();
merger.stream.forEach(print);
await merger.addStream(tokens);
await merger.addStream(twoWordTokens);
await merger.addStream(threeWordTokens);
Then only the 1-word tokens, i.e. elements of the original broadcast stream are printed. Elements of the derived streams are not.
I kind of understand why this happens, because all my streams are derived from the original broadcast stream.
Is there a better way to implement such a pipeline?
Probably my problem can be reformulated in terms of stream duplication/forking, but I can't see a way to clone a stream in Dart. If you can advice on that - please do.
Upvotes: 1
Views: 1009
Reputation: 71623
I hope to allow concurrent addStream
at some point, but until then, you need to handle the events indpendently:
var allAdds = [
tokens.forEach(merger.add),
twoWordTokens.forEach(merger.add),
threeWordTokens.forEach(merger.add)];
Future.wait(allAdds).then((_) { merger.close(); });
merger.stream.forEach(print);
That's if you want to control everything yourself. You can also use the StreamGroup
class from package:async
. It collects a number of streams and emits their events as a single stream.
This assumes that you have no error events.
Upvotes: 1