Raghu N
Raghu N

Reputation: 103

How do I cancel a StreamSubscription inside a Cubit?

I have a cubit that listens to a stream of messages, and emits a state which holds the messages. In the screen, I am using a BlocProvider to access the cubit, and a BlocBuilder to display the messages.

In instances like below, do I need to close the StreamSubscription created on listen()? Is there a clean way to do it?

class MessageCubit extends Cubit<MessageState> {
  final GetMessagesUseCase getMessagesUseCase;
  MessageCubit({this.getMessagesUseCase}) : super(MessageInitial());
  Future<void> getMessages({String senderId, String recipientId}) async {
    emit(MessageLoading());
    try {
      final messagesStreamData = getMessagesUseCase.call();

      //This is where I listen to a stream
      messagesStreamData.listen((messages) {
        emit(MessageLoaded(messages: messages));
      });


    } on SocketException catch (_) {
      emit(MessageFailure());
    } catch (_) {
      emit(MessageFailure());
    }
  }
}

Upvotes: 3

Views: 4114

Answers (1)

Robert Sandberg
Robert Sandberg

Reputation: 8597

You don't need to close the subscription, but you should as good practice to avoid potential memory leaks. Since it is so straightforward it's not any sacrifice.

  1. Create a class variable of type StreamSubscription<your type>. Let's say it's named sub.
  2. In getMessages before listen: await sub?.cancel()
  3. Then sub = messagesStreamData.listen(...
  4. Override the Cubit's close method and run the same command as in bullet 2.

Full code:

class MessageCubit extends Cubit<MessageState> {
  final GetMessagesUseCase getMessagesUseCase;

  // Added
  StreamSubscription<YOUR_MESSAGES_TYPE> sub;

  MessageCubit({this.getMessagesUseCase}) : super(MessageInitial());
  Future<void> getMessages({String senderId, String recipientId}) async {
    emit(MessageLoading());
    try {
      final messagesStreamData = getMessagesUseCase.call();

      // Added
      await sub?.cancel();
      //This is where I listen to a stream
      sub = messagesStreamData.listen((messages) {
        emit(MessageLoaded(messages: messages));
      });


    } on SocketException catch (_) {
      emit(MessageFailure());
    } catch (_) {
      emit(MessageFailure());
    }
  }

  // Added    
  @override
  Future<void> close() async {
    await sub?.cancel();
    return super.close();
  }
}

Upvotes: 12

Related Questions