Mikel
Mikel

Reputation: 504

Combine sink only receives outputs after they have all been published

I'm trying to perform some iterative work, and use Combine to publish the progress (0.0 - 100.0) using a CurrentValueSubject, which my ViewModel will then subscribe to

(Edit: the ViewModel controls a SwiftUI ProgressView, which is why receive(on: DispatchQueue.main) is used)

What I'm seeing is that the outputs are being published, but sink doesn't receive any of them until the publisher has completed.

Here's a simplified example:

// Class that performs iterative calculations and publish its progress

class JobWorker {
  private var subject: CurrentValueSubject<Double, Never>

  private var progress = 0.0

  init() {
    self.subject = CurrentValueSubject<Double, Never>(progress)
  }

  func getPublisher() {
    return subject.eraseToAnyPublisher()
  }

  func doWork() {
    let tasks = [1,2,3,4,5]
    tasks.forEach { num in

      // ... perform some calculations ...

      self.incrementProgress(20.0)
    }
  }

  func incrementProgress(_ by: Double) {
    progress += by
    if progress >= 100.0 {
      print("PUBLISH completion")
      subject.send(completion: .finished)
    } else {
      print("PUBLISH value \(progress)")
      subject.send(progress)
    }
  }
}
// ViewModel that subscribes to JobWorker's publisher and updates the progress in the view

final class JobViewModel: ObservableObject {
  @Published var progress: Double = 0.0

  private var cancellables = Set<AnyCancellable>()
  private var jobWorker: JobWorker

  init() {
    self.jobWorker = JobWorker()
  }

  func runJob() {
    self.jobWorker
      .getPublisher()
      .receive(on: DispatchQueue.main)
      .handleEvents(
          receiveSubscription: { _ in
              print("RECEIVE subscription")
          },
          receiveOutput: { value in
              print("RECEIVE output \(value)")
          },
          receiveCompletion: { _ in
              print("RECEIVE completion")
          },
          receiveCancel: {
              print("RECEIVE cancel")
          },
          receiveRequest: { _ in
              print("RECEIVE demand")
          }
      )
      .sink { [weak self] (completion) in
          guard let self = self else { return }
          print("SINK completion")
      } receiveValue: { [weak self] (value) in
          guard let self = self else { return }
          print("SINK output \(value)")
          self.progress = value
      }
      .store(in: &cancellables)
    
    print("*** DO WORK ***")
    self.jobWorker.doWork()
  }
}

Calling JobViewModel.runJob results in the following output:

RECEIVE subscription
RECEIVE demand
RECEIVE output 0.0
SINK output 0.0
*** DO WORK ***
PUBLISH value 20.0
PUBLISH value 40.0
PUBLISH value 60.0
PUBLISH value 80.0
PUBLISH value 100.0
PUBLISH completion
RECEIVE output 20.0
SINK output 20.0
RECEIVE output 40.0
SINK output 40.0
RECEIVE output 60.0
SINK output 60.0
RECEIVE output 80.0
SINK output 80.0
RECEIVE output 100.0
SINK output 100.0
RECEIVE completion
SINK completion

After the CurrentValueSubject is first initialized, all of the outputs are published before handleEvents or sink receives anything.

Instead, I would have expected the output to show PUBLISH output x, RECEIVE output x, SINK output x for each of the outputs, followed by the completion.

Upvotes: -1

Views: 936

Answers (1)

Scott Thompson
Scott Thompson

Reputation: 23701

The problem is that you are running your worker on the same thread where you are collecting the results.

Because you are doing a receive(on:) on the main DispatchQueue each value that passes through receive(on:) is roughly equivalent to putting a new block on the main queue to be executed when the queue is free.

Your worker fires up, executing synchronously on the main queue. While it's running, the main queue is tied up and not available for other work.

As the worker does its thing, it is publishing results to the subject, and as part of the publisher pipeline receive(on:) queues up the delivery of each result to the main queue, waiting for that queue to be free. The critical point, however, is that the main queue won't be free to handle those blocks, and report results, until the worker is done because the worker itself is tying up the main queue.

So none of your results are reported until after all the work is one.

I suspect what you want to do is run your work in a different context, off the main thread, so that it can complete asynchronously and only report the results on the main thread.

Here's a playground, based on your code, that does that:

import UIKit
import Combine
import PlaygroundSupport

class JobWorker {
    private var subject = CurrentValueSubject<Double, Never>(0)
    var publisher: AnyPublisher<Double, Never> {
        get { subject.eraseToAnyPublisher() }
    }
    
    func doWork() async {
        do {
            for subtask in 1...5 {
                guard !Task.isCancelled else { break }

                print("doing task \(subtask)")
                self.incrementProgress(by: 20)

                try await Task.sleep(nanoseconds: 1 * NSEC_PER_SEC)
            }
        } catch is CancellationError {
            print("The Tasks were cancelled")
        } catch {
            print("An unexpected error occured")
        }
    }
    
    private func incrementProgress(by: Double) {
        subject.value = subject.value + by;
        if subject.value >= 100 {
            subject.send(completion: .finished)
        }
    }
}

let worker = JobWorker()
let subscription = worker.publisher
    .print()
    .receive(on: DispatchQueue.main)
    .sink { _ in
        print("done")
    } receiveValue: { value in
        print("New Value Received \(value)")
    }

Task {
    await worker.doWork()
}

PlaygroundPage.current.needsIndefiniteExecution = true

I made your doWork function an async function so I could execute it from an independent Task. I also added a delay because it makes the asynchronous nature of the code a bit easier to see.

In the "main thread, I create a JobWorker and subscribe to its publisher, but to do the work I create a task and run doWork in that separate task. Progress is reported in the main thread, but the work is being done (and completed) in a different execution context.

Upvotes: 1

Related Questions