Natanel
Natanel

Reputation: 1758

Not receiving inputs when using `.receive(on: DispatchQueue.main)`

I’m trying to change to the main thread in the downstream with .receive(on: DispatchQueue.main) but then I don’t receive inputs when using either .subscribe(:) or .sink(receiveValue:). If I don’t change threads I do receive the proper inputs.

Publisher

extension URLSessionWebSocketTask {
  struct ReceivePublisher: Publisher {
    typealias Output = Message
    typealias Failure = Error

    let task: URLSessionWebSocketTask

    func receive<S>(subscriber: S) where S: Subscriber, Output == S.Input, Failure == S.Failure {
      task.receive { result in
        switch result {
        case .success(let message): _ = subscriber.receive(message)
        case .failure(let error): subscriber.receive(completion: .failure(error))
        }
      }
    }
  }
}

extension URLSessionWebSocketTask {
  func receivePublisher() -> ReceivePublisher {
    ReceivePublisher(task: self)
  }
}

Subscriber

extension ViewModel: Subscriber {
  typealias Input = URLSessionWebSocketTask.Message
  typealias Failure = Error

  func receive(subscription: Subscription) {}

  func receive(_ input: URLSessionWebSocketTask.Message) -> Subscribers.Demand {
    // Handle input here.
    // When using `.receive(on:)` this method is not called when should be.
    return .unlimited
  }

  func receive(completion: Subscribers.Completion<Error>) {}
}

Subscribe

socketTask.receivePublisher()
      .receive(on: DispatchQueue.main)
      .subscribe(viewModel)
socketTask.resume()

Upvotes: 0

Views: 2040

Answers (1)

keji
keji

Reputation: 5990

The AnyCancellable returned by subscribe<S>(_ subject: S) -> AnyCancellable will call cancel() when it has been deinitialized. Therefore if you don't save it it will be deinitialized when the calling block goes out of scope.

Out of the videos and tutorials I have seen from WWDC, how to work with this was never addressed. What I've seen is that people are drifting towards RxSwift's DisposeBag solution.

Update Beta 4: Combine now comes with a method on AnyCancellable called: store(in:) that does pretty much what my old solution does. You can just store the AnyCancellables in a set of AnyCancellable:

var cancellables = Set<AnyCancellable>()
...
override func viewDidLoad() {
    super.viewDidLoad()
    ...
    socketTask.receivePublisher()
        .receive(on: DispatchQueue.main)
        .subscribe(viewModel)
        .store(in: &cancellables)
}

This way the array (and all AnyCancellables) will be deinitialized when the containing class is deinitialized.

Outdated:

If you want a solution for all Cancellables that can be used in a way that flows better you could extend Cancellable as such:

extension Cancellable {

    func cancel(with cancellables: inout [AnyCancellable]) {
        if let cancellable = self as? AnyCancellable {
            cancellables.append(cancellable)
        } else {
            cancellables.append(AnyCancellable(self))
        }
    }

}

Upvotes: 2

Related Questions