Alexander Arendar
Alexander Arendar

Reputation: 3425

Cannot add multiple streams derived from the same broadcast stream to a StreamController

I am implementing one tokenizer. It parses the document, tokenizes it on a set of possible delimiters and then provides me combination of 1-, 2- and 3-word tokens. I was able to achieve my goal, but only in one specific way:

  Stream<String> contentStr = file.openRead().transform(utf8.decoder);
  Stream<String> tokens = contentStr.transform(charSplitter).transform(tokenizer).asBroadcastStream();
  var twoWordTokens = tokens.transform(sliding(2));
  var threeWordTokens = tokens.transform(sliding(3));
  StreamController<String> merger = StreamController();
  tokens.forEach((token) => merger.add(token));
  threeWordTokens.forEach((token) => merger.add(token));
  twoWordTokens.forEach((token) => merger.add(token));
  merger.stream.forEach(print);

As you can see I do following:

It works but I don't like that I add each element from source streams via StreamConsumer.add method. I wanted to use StreamController.addStream instead but that somehow does not work. The following code gives me a Bad state: Cannot add event while adding a stream error and I understand why:

  StreamController<String> merger = StreamController();
  merger.addStream(tokens);
  merger.addStream(twoWordTokens);
  merger.addStream(threeWordTokens);
  merger.stream.forEach(print);

This is per API documentation of the StreamController.addStream. So I need to wait for each addStream returning future completion:

StreamController<String> merger = StreamController();
await merger.addStream(tokens);
await merger.addStream(twoWordTokens);
await merger.addStream(threeWordTokens);
await merger.stream.forEach(print);

But in this case I get nothing printed in the console.

If I do this:

StreamController<String> merger = StreamController();
merger.stream.forEach(print);
await merger.addStream(tokens);
await merger.addStream(twoWordTokens);
await merger.addStream(threeWordTokens);

Then only the 1-word tokens, i.e. elements of the original broadcast stream are printed. Elements of the derived streams are not.

I kind of understand why this happens, because all my streams are derived from the original broadcast stream.

Is there a better way to implement such a pipeline?

Probably my problem can be reformulated in terms of stream duplication/forking, but I can't see a way to clone a stream in Dart. If you can advice on that - please do.

Upvotes: 1

Views: 1009

Answers (1)

lrn
lrn

Reputation: 71623

I hope to allow concurrent addStream at some point, but until then, you need to handle the events indpendently:

var allAdds = [
    tokens.forEach(merger.add), 
    twoWordTokens.forEach(merger.add), 
    threeWordTokens.forEach(merger.add)]; 
Future.wait(allAdds).then((_) { merger.close(); });

merger.stream.forEach(print);

That's if you want to control everything yourself. You can also use the StreamGroup class from package:async. It collects a number of streams and emits their events as a single stream.

This assumes that you have no error events.

Upvotes: 1

Related Questions