Reputation: 4331
class MyPage extends StatelessWidget {
@override
Widget build(BuildContext context) {
return DefaultTabController(
length: 2,
child: new Scaffold(
appBar: TabBar(
tabs: [
Tab(child: Text("MY INFORMATION",style: TextStyle(color: Colors.black54),)),
Tab(child: Text("WEB CALENDER",style: TextStyle(color: Colors.black54),)),
],
),
body:PersonalInformationBlocProvider(
movieBloc: PersonalInformationBloc(),
child: TabBarView(
children: [
MyInformation(),
new SmallCalendarExample(),
],
),
),
),
);
}
}
class MyInformation extends StatelessWidget{
// TODO: implement build
var deviceSize;
//Column1
Widget profileColumn(PersonalInformation snapshot) => Container(
height: deviceSize.height * 0.24,
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: <Widget>[
Row(
mainAxisAlignment: MainAxisAlignment.spaceEvenly,
children: <Widget>[
Container(
decoration: BoxDecoration(
borderRadius:
new BorderRadius.all(new Radius.circular(50.0)),
border: new Border.all(
color: Colors.black,
width: 4.0,
),
),
child: CircleAvatar(
backgroundImage: NetworkImage(
"http://www.binaythapa.com.np/img/me.jpg"),
foregroundColor: Colors.white,
backgroundColor: Colors.white,
radius: 40.0,
),
),
ProfileTile(
title: snapshot.firstName,
subtitle: "Developer",
),
SizedBox(
height: 10.0,
),
],
)
],
),
);
Widget bodyData(PersonalInformation snapshot) {
return SingleChildScrollView(
child: Column(
children: <Widget>[
profileColumn(snapshot)
],
),
);
}
@override
Widget build(BuildContext context) {
final personalInformationBloc = PersonalInformationBlocProvider.of(context);
deviceSize = MediaQuery.of(context).size;
return StreamBuilder(
stream: personalInformationBloc.results,
builder: (context,snapshot){
if (!snapshot.hasData)
return Center(
child: CircularProgressIndicator(),
);
return bodyData(snapshot.data);
}
);
}
}
I am using Bloc Pattern for retrieving data from Rest API (just called the whole object from JSON and parsed user name only). The Page consists of two tabs MyInformation and SmallCalendar. When the app runs the data are fetched correctly and everything is good. When I go to tab two and return to tab one then the whole screens in tab one goes to red showing error:
Bad state: Stream has already been listened to.
Upvotes: 155
Views: 146056
Reputation: 1814
I solved by using the Stream.fromIterable
constructor instead of Stream.fromFutures
, which doesn't have the single-subscription constraint the latter has:
/// Create a single-subscription stream from a group of futures.
Upvotes: 0
Reputation: 2658
I had this error when I wanted to resend a http request in intereptors and creating Response
from the result which was a StreamedResponse
like this:
final StreamedResponse streamedResponse = await lastRequest.send();
var response = Response.fromStream(streamedResponse); //this will have error
So I created a copy of the stream like this and the problem solved:
var response = Response.fromStream(streamedResponse.copyWith());
Upvotes: 0
Reputation: 1446
If WebSocket channel itself not support multiple listeners then you can work around by maintaining a list of listeners in your WebSocketService class and using a StreamController to manage the stream.
Here's an example of how you might adjust your WebSocketService class to handle multiple listeners:
import 'dart:async';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/io.dart';
class WebSocketService {
final String _url;
WebSocketChannel _channel;
StreamController<dynamic> _controller = StreamController.broadcast();
WebSocketService(this._url);
Future<void> connect() async {
_channel = IOWebSocketChannel.connect(_url);
_channel.stream.listen((dynamic message) {
_controller.add(message);
});
}
Stream<dynamic> get onMessage => _controller.stream;
}
Upvotes: 1
Reputation: 9
i have experienced this, always closing the streamcontroller worked for me.
Upvotes: 0
Reputation: 336
I experienced this because, i was using a stream builder to create a list for tabs of tabview and anytime i switch tabs and come back to the previous i get this error. "wrapping the stream builder with a builder widget" did the magic for me.
Upvotes: 2
Reputation: 165
Call .broadcast() on your stream controller
example:
StreamController<T> sampleController =
StreamController<T>.broadcast();
Upvotes: 4
Reputation: 14667
I was getting this error when navigating away and then back to the view listening to the stream because I was pushing a new instance of the same view into the Navigator stack, which effectively ended up creating a new listener even though it was the same place in code.
Specifically and in more detail, I had a ListItemsView
widget which uses StreamBuilder
to show all the items in a stream. User taps on the "Add Item" button which pushes the AddItemView
in the Navigator stack, and after submitting the form, the user is brought back to the ListItemsView
, where the "Bad state: Stream has already been listened to." error happens.
For me the fix was to replace Navigator.pushNamed(context, ListItemsView.routeName)
with Navigator.pop(context)
. This effectively prevents the instantiation of a new ListItemsView (as the second subscriber to the same stream), and just takes the user back to the previous ListItemsView instance.
Upvotes: 0
Reputation: 783
make sure you dispose controllers!
@override
void dispose() {
scrollController.dispose();
super.dispose();
}
Upvotes: 0
Reputation: 4196
In my case, I was getting this error because the same line of code myStream.listen()
was being called twice in the same widget on the same stream. Apparently this is not allowed!
UPDATE: If you intend to subscribe to the same stream more than once, you should use a behavior subject instead:
// 1- Create a behavior subject
final _myController = BehaviorSubject<String>();
// 2- To emit/broadcast new events, we will use Sink of the behavior subject.
Sink<String> get mySteamInputSink => _myController.sink;
// 3- To listen/subscribe to those emitted events, we will use Stream (observable) of the behavior subject.
Stream<String> get myStream => _myController.stream;
// 4- Firstly, Listen/subscribe to stream events.
myStream.listen((latestEvent) {
// use latestEvent data here.
});
// 5- Emit new events by adding them to the BehaviorSubject's Sink.
myStreamInputSink.add('new event');
That's it!
However, there is one final important step.
6- We must unsubscribe from all stream listeners before a widget is destroyed.
Why? (You might ask)
Because if a widget subscribes to a stream, and when this widget is destroyed, the destroyed widget stream subscription will remain in app memory causing memory leaks and unpredictable behavior.:
_flush() {
_myController.close();
_myController = StreamController<String>();
}
############################### ###############################
What fixed it for me is to both create a my stream controller as a broadcast stream controller:
var myStreamController = StreamController<bool>.broadcast();
AND
use stream as a broadcast stream:
myStreamController.stream.asBroadcastStream().listen(onData);
Upvotes: 37
Reputation: 2616
I think not all of the answers take into account the situation where you do not want or simply can't use broadcast stream.
More often than not, you have to rely on receiving past events because the listener might be created later than the stream it listens to and it's important to receive such information.
In Flutter what will often happen is that widget listening to the stream ("listener") gets destroyed and built again. If you attempt to attach listener to the same stream as before, you will get this error.
To overcome this, you will have to manage your streams manually. I created this gist demonstrating how that can be done. You can also run this code on this dartpad to see how it behaves and play with it. I have used simple String
ids to refer to specific StreamController
instances but there might be better solutions too (perhaps symbols).
The code from the gist is:
/* NOTE: This approach demonstrates how to recreate streams when
your listeners are being recreated.
It is useful when you cannot or do not want to use broadcast
streams. Downside to broadcast streams is that it is not
guaranteed that your listener will receive values emitted
by the stream before it was registered.
*/
import 'dart:async';
import 'dart:math';
// [StreamService] manages state of your streams. Each listener
// must have id which is used in [_streamControllers] map to
// look up relevant stream controller.
class StreamService {
final Map<String, StreamController<int>?> _streamControllers = {};
Stream<int> getNamedStream(String id) {
final controller = _getController(id);
return controller.stream;
}
// Will get existing stream controller by [id] or create a new
// one if it does not exist
StreamController<int> _getController(String id) {
final controller = _streamControllers[id] ?? _createController();
_streamControllers[id] = controller;
return controller;
}
void push(String id) {
final controller = _getController(id);
final rand = Random();
final value = rand.nextInt(1000);
controller.add(value);
}
// This method can be called by listener so
// memory leaks are avoided. This is a cleanup
// method that will make sure the stream controller
// is removed safely
void disposeController(String id) {
final controller = _streamControllers[id];
if (controller == null) {
throw Exception('Controller $id is not registered.');
}
controller.close();
_streamControllers.remove(id);
print('Removed controller $id');
}
// This method should be called when you want to remove
// all controllers. It should be called before the instance
// of this class is garbage collected / removed from memory.
void dispose() {
_streamControllers.forEach((id, controller) {
controller?.close();
print('Removed controller $id during dispose phase');
});
_streamControllers.clear();
}
StreamController<int> _createController() {
return StreamController<int>();
}
}
class ManagedListener {
ManagedListener({
required this.id,
required StreamService streamService,
}) {
_streamService = streamService;
}
final String id;
late StreamService _streamService;
StreamSubscription<int>? _subscription;
void register() {
_subscription = _streamService.getNamedStream(id).listen(_handleStreamChange);
}
void dispose() {
_subscription?.cancel();
_streamService.disposeController(id);
}
void _handleStreamChange(int n) {
print('[$id]: streamed $n');
}
}
void main(List<String> arguments) async {
final streamService = StreamService();
final listener1Id = 'id_1';
final listener2Id = 'id_2';
final listener1 = ManagedListener(id: listener1Id, streamService: streamService);
listener1.register();
streamService.push(listener1Id);
streamService.push(listener1Id);
streamService.push(listener1Id);
await Future.delayed(const Duration(seconds: 1));
final listener2 = ManagedListener(id: listener2Id, streamService: streamService);
listener2.register();
streamService.push(listener2Id);
streamService.push(listener2Id);
await Future.delayed(const Duration(seconds: 1));
listener1.dispose();
listener2.dispose();
streamService.dispose();
}
Upvotes: 4
Reputation: 1
For other case scenarios. Watch out if you are somehow using a stream inside a stateless class. This is one of the reasons you get the above error. Convert the stateless class to stateful and call init and dispose method on the streamController:
@override
void initState() {
super.initState();
YourStreamController.init();
}
@override
void dispose() {
YourStreamController.dispose();
super.dispose();
}
Upvotes: 0
Reputation: 803
This could help any other person. In my case i was using two StreamBuilder one in each tab. So when i swipe to above the tab and back. The other stream was already listened so i get the error.
What i did was to remove the StreamBuilder from the tabs and put it on top. I setState each time there is a change. I return an empty Text('') to avoid showing anything. I hope this methods
Upvotes: 0
Reputation: 399
This is a problem for the provider, I solved it by change provider initialization
Eg
locator.registerSingleton<LoginProvider>(LoginProvider());
TO
locator.registerFactory(() => TaskProvider());
Where locator is
GetIt locator = GetIt.instance;
Upvotes: 0
Reputation: 32449
You could use broadcast
, which allows to listen stream more than once, but it also prevents from listening past events:
Broadcast streams do not buffer events when there is no listener.
A better option is to use BehaviorSubject
from rxdart
package class as StreamController
. BehaviorSubject
is:
A special StreamController that captures the latest item that has been added to the controller, and emits that as the first item to any new listener.
The usage is as simple as:
StreamController<...> _controller = BehaviorSubject();
Upvotes: 50
Reputation: 203
In my case I was Using the Package Connectivity while on flutter web. Commenting all Connectivity calls solved the issue.
I'm now just using Connectivity while only on Android/iOS.
So maybe check your Packages im you are using some packages that have some issues on Web in case you are developing for web.
Hopefully I could help someone with this Information.
Upvotes: 0
Reputation: 129
I have had the same issue when I used a result of Observable.combineLatest2 for StreamBuilder into Drawer:
flutter: Bad state: Stream has already been listened to.
As for me, the best solution has added the result of this combine to new BehaviorSubject and listen new one.
Don't forget to listen old one !!!
class VisitsBloc extends Object {
Map<Visit, Location> visitAndLocation;
VisitsBloc() {
visitAndLocations.listen((data) {
visitAndLocation = data;
});
}
final _newOne = new BehaviorSubject<Map<Visit, Location>>();
Stream<Map<Visit, Location>> get visitAndLocations => Observable.combineLatest2(_visits.stream, _locations.stream, (List<vis.Visit> visits, Map<int, Location> locations) {
Map<vis.Visit, Location> result = {};
visits.forEach((visit) {
if (locations.containsKey(visit.skuLocationId)) {
result[visit] = locations[visit.skuLocationId];
}
});
if (result.isNotEmpty) {
_newOne.add(result);
}
});
}
I didn't use .broadcast
because it slowed my UI.
Upvotes: 4
Reputation: 657148
StreamSplitter.split() from the async
can be used for this use case
import 'package:async/async.dart';
...
main() {
var process = Process.start(...);
var stdout = StreamSplitter<List<int>>(process.stdout);
readStdoutFoo(stdout.split());
readStdoutBar(stdout.split());
}
readStdoutFoo(Stream<List<int>> stdout) {
stdout.transform(utf8.decoder)...
}
readStdoutBar(Stream<List<int>> stdout) {
stdout.transform(utf8.decoder)...
}
Upvotes: 0
Reputation: 37
For me defining my stream as a global variable worked
Stream infostream (was inside the ...State in a stateful widget i defined it outside the widget and it worked
(not sure if the best solution but give it a try)
Upvotes: 1
Reputation: 1344
For those of you running into this while doing Future.asStream()
, you'll need Future.asStream().shareReplay(maxSize: 1)
to make it a broadcast/hot stream.
Upvotes: 1
Reputation: 2283
Just to sum up:
The main difference is broadcast()
creates a Stream
listenable for multiple sources but it needs to be listened for at least one source to start emitting items.
A Stream should be inert until a subscriber starts listening on it (using the [onListen] callback to start producing events).
asBroadcastStream
turns an existing Stream
into a multi listenable one but it doesn't need to be listened to start emitting since it calls onListen()
under the hood.
Upvotes: 4
Reputation: 4562
You should use the following.
StreamController<...> _controller = StreamController<...>.broadcast();
Upvotes: 171
Reputation: 4331
The problem was due to not disposing the controllers in bloc.
void dispose() {
monthChangedController.close();
dayPressedController.close();
resultController.close();
}
Upvotes: 22
Reputation: 276949
The most common form of Stream
can be listened only once at a time. If you try to add multiple listeners, it will throw
Bad state: Stream has already been listened to
To prevent this error, expose a broadcast Stream
. You can convert your stream to a broadcast using myStream.asBroadcastStream
This needs to be done inside your class that expose Stream
. Not as parameter of StreamBuilder
. Since asBroadcastStream
internally listen to the original stream to generate the broadcast one, this imply you can't call this method twice on the same stream.
Upvotes: 85