BINAY THAPA MAGAR
BINAY THAPA MAGAR

Reputation: 4331

Flutter : Bad state: Stream has already been listened to


    class MyPage extends StatelessWidget {
      @override
      Widget build(BuildContext context) {
        return DefaultTabController(
          length: 2,
          child: new Scaffold(
            appBar: TabBar(
              tabs: [
                Tab(child: Text("MY INFORMATION",style: TextStyle(color: Colors.black54),)),
                Tab(child: Text("WEB CALENDER",style: TextStyle(color: Colors.black54),)),
              ],
            ),
            body:PersonalInformationBlocProvider(
              movieBloc: PersonalInformationBloc(),
              child: TabBarView(
                children: [
                  MyInformation(),
                  new SmallCalendarExample(),
                ],
              ),
            ),
          ),
        );
      }
    }
    
    class MyInformation extends StatelessWidget{
      // TODO: implement build
      var deviceSize;
    
      //Column1
      Widget profileColumn(PersonalInformation snapshot) => Container(
        height: deviceSize.height * 0.24,
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Row(
              mainAxisAlignment: MainAxisAlignment.spaceEvenly,
              children: <Widget>[
                Container(
                  decoration: BoxDecoration(
                    borderRadius:
                    new BorderRadius.all(new Radius.circular(50.0)),
                    border: new Border.all(
                      color: Colors.black,
                      width: 4.0,
                    ),
                  ),
                  child: CircleAvatar(
                    backgroundImage: NetworkImage(
                        "http://www.binaythapa.com.np/img/me.jpg"),
                    foregroundColor: Colors.white,
                    backgroundColor: Colors.white,
                    radius: 40.0,
                  ),
                ),
                ProfileTile(
                  title: snapshot.firstName,
                  subtitle: "Developer",
                ),
                SizedBox(
                  height: 10.0,
                ),
              ],
            )
          ],
        ),
      );
      Widget bodyData(PersonalInformation snapshot) {
        return SingleChildScrollView(
            child: Column(
              children: <Widget>[
                profileColumn(snapshot)
              ],
            ),
        );
      }
    
    
      @override
      Widget build(BuildContext context) {
        final personalInformationBloc = PersonalInformationBlocProvider.of(context);
    
        deviceSize = MediaQuery.of(context).size;
        return StreamBuilder(
            stream: personalInformationBloc.results,
            builder: (context,snapshot){
              if (!snapshot.hasData)
                return Center(
                  child: CircularProgressIndicator(),
                );
              return bodyData(snapshot.data);
            }
        );
      }
    }
   

I am using Bloc Pattern for retrieving data from Rest API (just called the whole object from JSON and parsed user name only). The Page consists of two tabs MyInformation and SmallCalendar. When the app runs the data are fetched correctly and everything is good. When I go to tab two and return to tab one then the whole screens in tab one goes to red showing error: Bad state: Stream has already been listened to.

Upvotes: 155

Views: 146056

Answers (23)

Jože Ws
Jože Ws

Reputation: 1814

I solved by using the Stream.fromIterable constructor instead of Stream.fromFutures, which doesn't have the single-subscription constraint the latter has:

/// Create a single-subscription stream from a group of futures.

Upvotes: 0

Zahra
Zahra

Reputation: 2658

I had this error when I wanted to resend a http request in intereptors and creating Response from the result which was a StreamedResponse like this:

final StreamedResponse streamedResponse = await lastRequest.send();
    var response = Response.fromStream(streamedResponse); //this will have error

So I created a copy of the stream like this and the problem solved:

 var response = Response.fromStream(streamedResponse.copyWith());

Upvotes: 0

Titas Černiauskas
Titas Černiauskas

Reputation: 1446

If WebSocket channel itself not support multiple listeners then you can work around by maintaining a list of listeners in your WebSocketService class and using a StreamController to manage the stream.

Here's an example of how you might adjust your WebSocketService class to handle multiple listeners:

import 'dart:async';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/io.dart';

class WebSocketService {
  final String _url;
  WebSocketChannel _channel;
  StreamController<dynamic> _controller = StreamController.broadcast();

  WebSocketService(this._url);

  Future<void> connect() async {
    _channel = IOWebSocketChannel.connect(_url);
    _channel.stream.listen((dynamic message) {
      _controller.add(message);
    });
  }


  Stream<dynamic> get onMessage => _controller.stream;

}

Upvotes: 1

Alfon Chito Salano
Alfon Chito Salano

Reputation: 9

i have experienced this, always closing the streamcontroller worked for me.

Upvotes: 0

Charles Arko-Clayman
Charles Arko-Clayman

Reputation: 336

I experienced this because, i was using a stream builder to create a list for tabs of tabview and anytime i switch tabs and come back to the previous i get this error. "wrapping the stream builder with a builder widget" did the magic for me.

Upvotes: 2

Folorunso
Folorunso

Reputation: 165

Call .broadcast() on your stream controller

example:

StreamController<T> sampleController =
      StreamController<T>.broadcast();

Upvotes: 4

tonyslowdown
tonyslowdown

Reputation: 14667

I was getting this error when navigating away and then back to the view listening to the stream because I was pushing a new instance of the same view into the Navigator stack, which effectively ended up creating a new listener even though it was the same place in code.

Specifically and in more detail, I had a ListItemsView widget which uses StreamBuilder to show all the items in a stream. User taps on the "Add Item" button which pushes the AddItemView in the Navigator stack, and after submitting the form, the user is brought back to the ListItemsView, where the "Bad state: Stream has already been listened to." error happens.

For me the fix was to replace Navigator.pushNamed(context, ListItemsView.routeName) with Navigator.pop(context). This effectively prevents the instantiation of a new ListItemsView (as the second subscriber to the same stream), and just takes the user back to the previous ListItemsView instance.

Upvotes: 0

tensor
tensor

Reputation: 783

make sure you dispose controllers!

@override
  void dispose() {
    scrollController.dispose();
    super.dispose();
  }

Upvotes: 0

Samer
Samer

Reputation: 4196

In my case, I was getting this error because the same line of code myStream.listen() was being called twice in the same widget on the same stream. Apparently this is not allowed!

UPDATE: If you intend to subscribe to the same stream more than once, you should use a behavior subject instead:


// 1- Create a behavior subject
final _myController = BehaviorSubject<String>();

// 2- To emit/broadcast new events, we will use Sink of the behavior subject.
Sink<String> get mySteamInputSink => _myController.sink;

// 3- To listen/subscribe to those emitted events, we will use Stream (observable) of the behavior subject. 
Stream<String> get myStream => _myController.stream;

// 4- Firstly, Listen/subscribe to stream events.
myStream.listen((latestEvent) {
   // use latestEvent data here.
});

// 5- Emit new events by adding them to the BehaviorSubject's Sink. 
myStreamInputSink.add('new event');

That's it!

However, there is one final important step.

6- We must unsubscribe from all stream listeners before a widget is destroyed.

Why? (You might ask)

Because if a widget subscribes to a stream, and when this widget is destroyed, the destroyed widget stream subscription will remain in app memory causing memory leaks and unpredictable behavior.:

_flush() {
  _myController.close();
  _myController = StreamController<String>();
}

############################### ###############################

Old Answer:

What fixed it for me is to both create a my stream controller as a broadcast stream controller:

var myStreamController = StreamController<bool>.broadcast();

AND

use stream as a broadcast stream:

myStreamController.stream.asBroadcastStream().listen(onData);

Upvotes: 37

user3056783
user3056783

Reputation: 2616

I think not all of the answers take into account the situation where you do not want or simply can't use broadcast stream.

More often than not, you have to rely on receiving past events because the listener might be created later than the stream it listens to and it's important to receive such information.

In Flutter what will often happen is that widget listening to the stream ("listener") gets destroyed and built again. If you attempt to attach listener to the same stream as before, you will get this error.

To overcome this, you will have to manage your streams manually. I created this gist demonstrating how that can be done. You can also run this code on this dartpad to see how it behaves and play with it. I have used simple String ids to refer to specific StreamController instances but there might be better solutions too (perhaps symbols).

The code from the gist is:

/* NOTE: This approach demonstrates how to recreate streams when
         your listeners are being recreated.
         It is useful when you cannot or do not want to use broadcast
         streams. Downside to broadcast streams is that it is not
         guaranteed that your listener will receive values emitted
         by the stream before it was registered.
*/

import 'dart:async';
import 'dart:math';

// [StreamService] manages state of your streams. Each listener
// must have id which is used in [_streamControllers] map to
// look up relevant stream controller.
class StreamService {
  final Map<String, StreamController<int>?> _streamControllers = {};

  Stream<int> getNamedStream(String id) {
    final controller = _getController(id);
    return controller.stream;
  }

  // Will get existing stream controller by [id] or create a new
  // one if it does not exist
  StreamController<int> _getController(String id) {
    final controller = _streamControllers[id] ?? _createController();

    _streamControllers[id] = controller;

    return controller;
  }

  void push(String id) {
    final controller = _getController(id);

    final rand = Random();
    final value = rand.nextInt(1000);

    controller.add(value);
  }

  // This method can be called by listener so
  // memory leaks are avoided. This is a cleanup
  // method that will make sure the stream controller
  // is removed safely
  void disposeController(String id) {
    final controller = _streamControllers[id];

    if (controller == null) {
      throw Exception('Controller $id is not registered.');
    }

    controller.close();
    _streamControllers.remove(id);
    print('Removed controller $id');
  }

  // This method should be called when you want to remove
  // all controllers. It should be called before the instance
  // of this class is garbage collected / removed from memory.
  void dispose() {
    _streamControllers.forEach((id, controller) {
      controller?.close();
      print('Removed controller $id during dispose phase');
    });
    _streamControllers.clear();
  }

  StreamController<int> _createController() {
    return StreamController<int>();
  }
}

class ManagedListener {
  ManagedListener({
    required this.id,
    required StreamService streamService,
  }) {
    _streamService = streamService;
  }

  final String id;
  late StreamService _streamService;
  StreamSubscription<int>? _subscription;

  void register() {
    _subscription = _streamService.getNamedStream(id).listen(_handleStreamChange);
  }

  void dispose() {
    _subscription?.cancel();
    _streamService.disposeController(id);
  }

  void _handleStreamChange(int n) {
    print('[$id]: streamed $n');
  }
}

void main(List<String> arguments) async {
  final streamService = StreamService();

  final listener1Id = 'id_1';
  final listener2Id = 'id_2';

  final listener1 = ManagedListener(id: listener1Id, streamService: streamService);
  listener1.register();
  
  streamService.push(listener1Id);
  streamService.push(listener1Id);
  streamService.push(listener1Id);

  await Future.delayed(const Duration(seconds: 1));

  final listener2 = ManagedListener(id: listener2Id, streamService: streamService);
  listener2.register();

  streamService.push(listener2Id);
  streamService.push(listener2Id);

  await Future.delayed(const Duration(seconds: 1));
  
  listener1.dispose();
  listener2.dispose();

  streamService.dispose();
}

Upvotes: 4

SanShar
SanShar

Reputation: 1

For other case scenarios. Watch out if you are somehow using a stream inside a stateless class. This is one of the reasons you get the above error. Convert the stateless class to stateful and call init and dispose method on the streamController:

 @override
 void initState() {
   super.initState();
   YourStreamController.init();
 }

 @override
 void dispose() {
   YourStreamController.dispose();
   super.dispose();
 }

Upvotes: 0

b.john
b.john

Reputation: 803

This could help any other person. In my case i was using two StreamBuilder one in each tab. So when i swipe to above the tab and back. The other stream was already listened so i get the error.

What i did was to remove the StreamBuilder from the tabs and put it on top. I setState each time there is a change. I return an empty Text('') to avoid showing anything. I hope this methods

Upvotes: 0

abhijith k
abhijith k

Reputation: 399

This is a problem for the provider, I solved it by change provider initialization

Eg

locator.registerSingleton<LoginProvider>(LoginProvider());

TO

 locator.registerFactory(() => TaskProvider());

Where locator is

GetIt locator = GetIt.instance;

Upvotes: 0

Andrey Gordeev
Andrey Gordeev

Reputation: 32449

You could use broadcast, which allows to listen stream more than once, but it also prevents from listening past events:

Broadcast streams do not buffer events when there is no listener.

A better option is to use BehaviorSubject from rxdart package class as StreamController. BehaviorSubject is:

A special StreamController that captures the latest item that has been added to the controller, and emits that as the first item to any new listener.

The usage is as simple as:

StreamController<...> _controller = BehaviorSubject();

Upvotes: 50

nani
nani

Reputation: 203

In my case I was Using the Package Connectivity while on flutter web. Commenting all Connectivity calls solved the issue.

I'm now just using Connectivity while only on Android/iOS.

So maybe check your Packages im you are using some packages that have some issues on Web in case you are developing for web.

Hopefully I could help someone with this Information.

Upvotes: 0

Serhii
Serhii

Reputation: 129

I have had the same issue when I used a result of Observable.combineLatest2 for StreamBuilder into Drawer:

flutter: Bad state: Stream has already been listened to.

As for me, the best solution has added the result of this combine to new BehaviorSubject and listen new one.

Don't forget to listen old one !!!

class VisitsBloc extends Object {
    Map<Visit, Location> visitAndLocation;

    VisitsBloc() {
        visitAndLocations.listen((data) {
            visitAndLocation = data;
        });
    }

    final _newOne = new BehaviorSubject<Map<Visit, Location>>();

    Stream<Map<Visit, Location>> get visitAndLocations => Observable.combineLatest2(_visits.stream, _locations.stream, (List<vis.Visit> visits, Map<int, Location> locations) {
        Map<vis.Visit, Location> result = {};

        visits.forEach((visit) {
            if (locations.containsKey(visit.skuLocationId)) {
                result[visit] = locations[visit.skuLocationId];
            }
        });

        if (result.isNotEmpty) {
            _newOne.add(result);
        }
    });
}

I didn't use .broadcast because it slowed my UI.

Upvotes: 4

G&#252;nter Z&#246;chbauer
G&#252;nter Z&#246;chbauer

Reputation: 657148

StreamSplitter.split() from the async can be used for this use case

import 'package:async/async.dart';
...

main() {
  var process = Process.start(...);
  var stdout = StreamSplitter<List<int>>(process.stdout);
  readStdoutFoo(stdout.split());
  readStdoutBar(stdout.split());
}

readStdoutFoo(Stream<List<int>> stdout) {
  stdout.transform(utf8.decoder)...
}

readStdoutBar(Stream<List<int>> stdout) {
  stdout.transform(utf8.decoder)...
}

Upvotes: 0

Sanskar Tiwari
Sanskar Tiwari

Reputation: 37

For me defining my stream as a global variable worked

Stream infostream (was inside the ...State in a stateful widget i defined it outside the widget and it worked

(not sure if the best solution but give it a try)

Upvotes: 1

E. Sun
E. Sun

Reputation: 1344

For those of you running into this while doing Future.asStream(), you'll need Future.asStream().shareReplay(maxSize: 1) to make it a broadcast/hot stream.

Upvotes: 1

haroldolivieri
haroldolivieri

Reputation: 2283

Just to sum up:

The main difference is broadcast() creates a Stream listenable for multiple sources but it needs to be listened for at least one source to start emitting items.

A Stream should be inert until a subscriber starts listening on it (using the [onListen] callback to start producing events).

asBroadcastStream turns an existing Stream into a multi listenable one but it doesn't need to be listened to start emitting since it calls onListen() under the hood.

Upvotes: 4

hrsma2i
hrsma2i

Reputation: 4562

You should use the following.

StreamController<...> _controller = StreamController<...>.broadcast();

Upvotes: 171

BINAY THAPA MAGAR
BINAY THAPA MAGAR

Reputation: 4331

The problem was due to not disposing the controllers in bloc.

  void dispose() {
    monthChangedController.close();
    dayPressedController.close();
    resultController.close();
  }

Upvotes: 22

R&#233;mi Rousselet
R&#233;mi Rousselet

Reputation: 276949

The most common form of Stream can be listened only once at a time. If you try to add multiple listeners, it will throw

Bad state: Stream has already been listened to

To prevent this error, expose a broadcast Stream. You can convert your stream to a broadcast using myStream.asBroadcastStream

This needs to be done inside your class that expose Stream. Not as parameter of StreamBuilder. Since asBroadcastStream internally listen to the original stream to generate the broadcast one, this imply you can't call this method twice on the same stream.

Upvotes: 85

Related Questions