JAHelia
JAHelia

Reputation: 7932

Combine: Publisher sometimes loses value and completes

I have a simple Deferred Publisher that reads data from disk and I display the data in a SwiftUI List, the Publisher works well most of the time, but sometimes it doesn't behave well, it just loses its value (which's an array of Model objects) and completes with finished message. I've tried a workaround mentioned here to use the buffer operator to keep the value in buffer because I believe the Combine's Publisher by design won't pass the data downstream if there is no demand requested by subscribers and hence dropping this data and completes, however using buffer didn't solve the issue.

The code I have:

enum FileError: Error {
    case someError
}

class ViewModel: ObservableObject {
    @Published var modelArray = [Model]()
    private var subscriptions = Set<AnyCancellable>()
    func readData() {
        DataSource()
            .readFromBundle(resource: "Sample", type: "json")
            .receive(on: DispatchQueue.main)
            .sink(receiveCompletion: { completion in
                print("Completion: \(completion)")
            }) { array in
                self.modelArray = array
        }.store(in: &subscriptions)
    }
}
struct ContentView: View {
    @ObservedObject var viewModel: ViewModel

    var body: some View {
        VStack {
            List(self.viewModel.modelArray) { model in
                Text("\(model.name)")
            }
        }
        .onAppear {
            self.viewModel.readData()
        }
    }
}

struct Model: Codable, Identifiable {
    var id: Int
    var name: String
}

class DataSource {
    private let readQueue = DispatchQueue(label: "ReadQueue", qos: .default, attributes: .concurrent)

    func readFromBundle (resource: String, type:String) -> AnyPublisher<[Model], FileError> {
            Deferred {
                 Future { promise in
                    guard let url = Bundle.main.url(forResource: "Sample", withExtension: "json"),
                      let data = try? Data(contentsOf: url),
                      let modelArray = try? JSONDecoder().decode([Model].self, from: data)
                      else {
                        promise(.failure(.someError))
                        return
                    }
                      promise(.success(modelArray))
                }
            }
           .receive(on: self.readQueue)
           .eraseToAnyPublisher()
        }
}

This is a link to download a working sample project.

EDIT:

Environment: Xcode 11.3.1, iOS 13.3 iPhone 11 Pro Max simulator and device.

gif screenshot (notice the console output)

enter image description here

EDIT2:

if I add any downstream publishers, like combineLatest for example just before sink in the consumer function readData() then a new behavior introduced, which's chaining an async publisher (readFromBundle) with a sync publisher (combineLatest) will result in the value will not deliver at all on iOS 13.3+ devices and will sometimes deliver on devices below iOS 13.3, as stated on this link.

Upvotes: 3

Views: 1854

Answers (4)

rens
rens

Reputation: 304

The problem in the code is that the readQueue that is used in receive(on:) is concurrent. Both the value and the completion are dispatched to this queue separately, such that the order of value and completion are not guaranteed. If a downstream subscriber receives the completion first, it will cancel its subscription and ignore the value. Making the readQueue serial works just as well as using another serial queue such as DispatchQueue.main directly.

Using subscribe(on:) instead of receive(on:) worked in combination with the concurrent queue, because the promise call causes the value and completion sending to be dispatched together.

Upvotes: 0

user3441734
user3441734

Reputation: 17572

Let see the documentation about .receive(on:)

Specifies the scheduler on which to receive elements from the publisher. Declaration

func receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil) -> Publishers.ReceiveOn<Publishers.SubscribeOn<Deferred<Future<[Model], FileError>>, DispatchQueue>, S> where S : Scheduler

Discussion

You use the receive(on:options:) operator to receive results on a specific scheduler, such as performing UI work on the main run loop. In contrast with subscribe(on:options:), which affects upstream messages, receive(on:options:) changes the execution context of downstream messages. In the following example, requests to jsonPublisher are performed on backgroundQueue, but elements received from it are performed on RunLoop.main.

let jsonPublisher = MyJSONLoaderPublisher() // Some publisher.
let labelUpdater = MyLabelUpdateSubscriber() // Some subscriber that updates the UI.

jsonPublisher
    .subscribe(on: backgroundQueue)
    .receiveOn(on: RunLoop.main)
    .subscribe(labelUpdater)

Parameters

scheduler
The scheduler the publisher is to use for element delivery. options Scheduler options that customize the element delivery. Returns

A publisher that delivers elements using the specified scheduler.

in your case it means

import SwiftUI
import Combine

enum FileError: Error {
    case someError
}

class ViewModel: ObservableObject {
    @Published var modelArray = [Model]()
    private var subscriptions = Set<AnyCancellable>()
    func readData() {
        DataSource()
            .readFromBundle(resource: "Sample", type: "json")
            .sink(receiveCompletion: { completion in
                print("Completion: \(completion)")
            }) { array in
                print("received value")
                self.modelArray = array
        }.store(in: &subscriptions)
    }
}
struct ContentView: View {
    @ObservedObject var viewModel: ViewModel

    var body: some View {
        VStack {
            List(self.viewModel.modelArray) { model in
                Text("\(model.name)")
            }
        }
        .onAppear {
            self.viewModel.readData()
        }
    }
}

struct Model: Codable, Identifiable {
    var id: Int
    var name: String
}

class DataSource {
    private let readQueue = DispatchQueue(label: "ReadQueue", qos: .default, attributes: .concurrent)

    func readFromBundle (resource: String, type:String) -> AnyPublisher<[Model], FileError> {
            Deferred {
                 Future { promise in
                    guard let url = Bundle.main.url(forResource: "Sample", withExtension: "json"),
                      let data = try? Data(contentsOf: url),
                      let modelArray = try? JSONDecoder().decode([Model].self, from: data)
                      else {
                        promise(.failure(.someError))
                        return
                    }
                      promise(.success(modelArray))
                }
            }
            .subscribe(on: readQueue)
            .receive(on: RunLoop.main)
           .eraseToAnyPublisher()
        }
}

which explain, why Asperi's solution works. The difference is, that there is not necessary to call .receive(on:) again in readData()

the difference between DispatchQueue.main and RunLoop.main are not significant in your example.

Upvotes: 2

Asperi
Asperi

Reputation: 258413

It looks like racing-kind issue, please try the following (just by code-reading)

1) use background queue explicitly

private let readQueue = DispatchQueue(label: "ReadQueue", qos: .background, 
    attributes: .concurrent)

2) schedule Publisher on this queue instead of receiving on it

.subscribe(on: self.readQueue)

Upvotes: 2

Chris
Chris

Reputation: 8126

the first run does not fail, it just "needs" a time to load it....you can check this by adding this.

print("ready")
promise(.success(modelArray)) 

and then set a breakpoint to "not loaded yet" and you will see that the "not loaded yet" appears before the "ready" is printed in the console. This is not a drop of the publisher.

As onAppear() says, it will be called after the UI was shown....

if self.viewModel.modelArray.count == 0 {
                Text("not loaded yet")
            } else {
                List(self.viewModel.modelArray) { model in
                    Text("\(model.name)")
                }
            }

Upvotes: 0

Related Questions