J Sorel
J Sorel

Reputation: 353

Dart Streams unsubscribe

Hello!

I'm trying to understand the workings of a streams in Dart.
Here is a simple example:

  1. We have Publisher

    class Publisher {
    
        StreamController<String> _publishCtrl = new StreamController<String>();
        Stream<String> onPublish;
    
        Publisher() {
            onPublish = _publishCtrl.stream.asBroadcastStream();
        }
    
        void publish(String s) {
            _publishCtrl.add(s);
        }
    }
    
  2. And Reader

    class Reader {
        String name;
        Reader(this.name);
        read(String s) {
            print("My name is $name. I read string '$s'");
        }
    }
    
  3. And simple function main():

    main() {
        Publisher publisher = new Publisher();
    
        Reader john = new Reader('John');
        Reader smith = new Reader('Smith');
    
        publisher.onPublish.listen(john.read);
        publisher.onPublish.listen(smith.read);
    
        for (var i = 0; i < 5; i++) {
            publisher.publish("Test message $i");
        }
    }
    

As a result of the code I get 5 console messages from a reader John and 5 messages from reader Smith.

My name is John. I read string 'Test message 0'
My name is Smith. I read string 'Test message 0'
My name is John. I read string 'Test message 1'
My name is Smith. I read string 'Test message 1'
My name is John. I read string 'Test message 2'
My name is Smith. I read string 'Test message 2'
My name is John. I read string 'Test message 3'
My name is Smith. I read string 'Test message 3'
My name is John. I read string 'Test message 4'
My name is Smith. I read string 'Test message 4'

Everything works right. But if I try to change the cycle for so that after 2 steps reader Smith stoppes receiving the message, the message will only receive reader John.
Here is an example to change the function main ():

    main() {
        Publisher publisher = new Publisher();

        Reader john = new Reader('John');
        Reader smith = new Reader('Smith');

        publisher.onPublish.listen(john.read);
        var smithSub = publisher.onPublish.listen(smith.read);

        for (var i = 0; i < 5; i++) {
            publisher.publish("Test message $i");

            if (i > 2) {
                smithSub.cancel();
            }
        }
    }

If you run this code, the console will only posts by John:

My name is John. I read string 'Test message 0'
My name is John. I read string 'Test message 1'
My name is John. I read string 'Test message 2'
My name is John. I read string 'Test message 3'
My name is John. I read string 'Test message 4'

But I think there should be 3 messages from the reader Smith.

Please tell me if all I know right? If not, help me, please, to understand why this happens.

Thank you very much.

Upvotes: 2

Views: 1505

Answers (1)

G&#252;nter Z&#246;chbauer
G&#252;nter Z&#246;chbauer

Reputation: 657178

Either create a sync StreamController

StreamController<String> _publishCtrl = new StreamController<String>(sync: true);

or allow the controller to process the items before sending new one

  int i = 0;
  Future.doWhile(() {
    i++;

    publisher.publish("Test message $i");

    if (i > 2) {
      subscriptions
        ..forEach((s) => s.cancel())
        ..clear();
    }
    return i < 5;
  }

Upvotes: 3

Related Questions