Reputation: 1758
I’m trying to change to the main thread in the downstream with .receive(on: DispatchQueue.main)
but then I don’t receive inputs when using either .subscribe(:)
or .sink(receiveValue:)
. If I don’t change threads I do receive the proper inputs.
Publisher
extension URLSessionWebSocketTask {
struct ReceivePublisher: Publisher {
typealias Output = Message
typealias Failure = Error
let task: URLSessionWebSocketTask
func receive<S>(subscriber: S) where S: Subscriber, Output == S.Input, Failure == S.Failure {
task.receive { result in
switch result {
case .success(let message): _ = subscriber.receive(message)
case .failure(let error): subscriber.receive(completion: .failure(error))
}
}
}
}
}
extension URLSessionWebSocketTask {
func receivePublisher() -> ReceivePublisher {
ReceivePublisher(task: self)
}
}
Subscriber
extension ViewModel: Subscriber {
typealias Input = URLSessionWebSocketTask.Message
typealias Failure = Error
func receive(subscription: Subscription) {}
func receive(_ input: URLSessionWebSocketTask.Message) -> Subscribers.Demand {
// Handle input here.
// When using `.receive(on:)` this method is not called when should be.
return .unlimited
}
func receive(completion: Subscribers.Completion<Error>) {}
}
Subscribe
socketTask.receivePublisher()
.receive(on: DispatchQueue.main)
.subscribe(viewModel)
socketTask.resume()
Upvotes: 0
Views: 2040
Reputation: 5990
The AnyCancellable
returned by subscribe<S>(_ subject: S) -> AnyCancellable
will call cancel()
when it has been deinitialized. Therefore if you don't save it it will be deinitialized when the calling block goes out of scope.
Out of the videos and tutorials I have seen from WWDC, how to work with this was never addressed. What I've seen is that people are drifting towards RxSwift's DisposeBag
solution.
Update Beta 4:
Combine now comes with a method on AnyCancellable
called: store(in:)
that does pretty much what my old solution does. You can just store the AnyCancellable
s in a set of AnyCancellable
:
var cancellables = Set<AnyCancellable>()
...
override func viewDidLoad() {
super.viewDidLoad()
...
socketTask.receivePublisher()
.receive(on: DispatchQueue.main)
.subscribe(viewModel)
.store(in: &cancellables)
}
This way the array (and all AnyCancellable
s) will be deinitialized when the containing class is deinitialized.
Outdated:
If you want a solution for all Cancellable
s that can be used in a way that flows better you could extend Cancellable
as such:
extension Cancellable {
func cancel(with cancellables: inout [AnyCancellable]) {
if let cancellable = self as? AnyCancellable {
cancellables.append(cancellable)
} else {
cancellables.append(AnyCancellable(self))
}
}
}
Upvotes: 2