joshmori
joshmori

Reputation: 492

Grand Central Dispatch for complex flow?

I have a, b, c, d, e time consuming task functions with completion handler.

There are constraints between them:

  1. Both b & c wait for a to finish
  2. The last task e waits for b & c & d to finish

enter image description here

if there is no task d, I could write code in swift like this (not tested yet)

let group = DispatchGroup()

group.enter()
a() { group.leave() }
group.wait()

group.enter()
b() { group.leave() }

group.enter()
c() { group.leave() }

group.notify(queue: .main) {
    e()
}

How to add task d without waiting a to complete?


Edited on 4/30 10:00 (+8)

Code Different said

the easiest approach is to make the download function synchronous and add a warning to its documentation that it should never be called from the main thread.

So I made a version based on it. This way cannot handle the return values from concurrent calls. But it looks really like async/await. So I'm satisfied now. Thank you guys.

the async/await like part is

    myQueue.async {
        downloadSync("A")
        downloadSync("B", isConcurrent: true)
        downloadSync("C", isConcurrent: true)
        downloadSync("D", 4, isConcurrent: true)
        waitConcurrentJobs()
        downloadSync("E")
    }

And the full code is below.

    let myGroup = DispatchGroup()
    let myQueue = DispatchQueue(label: "for Sync/Blocking version of async functions")

    func waitConcurrentJobs() {
        myGroup.wait()
    }

    // original function (async version, no source code)
    func download(_ something: String, _ seconds: UInt32 = 1, completionHandler: @escaping ()->Void = {}) {
        print("Downloading \(something)")
        DispatchQueue.global().async {
            sleep(seconds)
            print("\(something) is downloaded")
            completionHandler()
        }
    }

    // wrapped function (synced version)
    // Warning:
    // It blocks current thead !!!
    // Do not call it on main thread
    func downloadSync(
        _ something: String,
        _ seconds: UInt32 = 1,
        isConcurrent: Bool = false
        ){
        myGroup.enter()
        download(something, seconds) { myGroup.leave() }
        if !isConcurrent {
            myGroup.wait()
        }
    }

    // Now it really looks like ES8 async/await
    myQueue.async {
        downloadSync("A")
        downloadSync("B", isConcurrent: true)
        downloadSync("C", isConcurrent: true)
        downloadSync("D", 4, isConcurrent: true)
        waitConcurrentJobs()
        downloadSync("E")
    }

results

enter image description here

Upvotes: 2

Views: 279

Answers (4)

Alex Belozierov
Alex Belozierov

Reputation: 129

You can use this framework to implement async/await pattern - https://github.com/belozierov/SwiftCoroutine

When you call await it doesn’t block the thread but only suspends coroutine, so you can use it in the main thread as well.

func awaitAPICall(_ url: URL) throws -> String? {
    let future = URLSession.shared.dataTaskFuture(for: url)
    let data = try future.await().data
    return String(data: data, encoding: .utf8)
}

func load(url: URL) {
    DispatchQueue.main.startCoroutine {
        let result1 = try self.awaitAPICall(url)
        let result2 = try self.awaitAPICall2(result1)
        let result3 = try self.awaitAPICall3(result2)
        print(result3)
    }
}

Upvotes: 0

Rob N
Rob N

Reputation: 16449

Your original effort seems very close to me. You could make a minor adjustment: make B, C, and D be the group that finishes to trigger E.

A could be another group, but since it's one task, I don't see the point. Trigger B and C when it's done.

Note that unlike some of the example code in your question and other answers, in the code below, D and A can both start right away and run in parallel.

let q = DispatchQueue(label: "my-queue", attributes: .concurrent)
let g = DispatchGroup()
func taskA() {  print("A")  }
func taskB() {  print("B"); g.leave()  }
func taskC() {  print("C"); g.leave()  }
func taskD() {  print("D"); g.leave()  }
func taskE() {  print("E")  }
g.enter()
g.enter()
g.enter()
q.async {
    taskA()
    q.async(execute: taskB)
    q.async(execute: taskC)
}
q.async(execute: taskD)
g.notify(queue: q, execute: taskE)

Upvotes: 0

CouchDeveloper
CouchDeveloper

Reputation: 19174

I would like to show an alternative solution using Scala like futures:

let result = funcA().flatMap { resultA in
    return [funcB(param: resultA.0),
            funcC(param: resultA.1),
            funcD()]
        .fold(initial: [String]()) { (combined, element) in
            return combined + [element]
    }
}.flatMap { result in
    return funcE(param: result)
}.map { result in
    print(result)
}

That's it basically. It handles errors (implicitly) and is thread-safe. No Operation subclasses ;)

Note, that funcD will be called only when A completes successfully. Since funcA() can fail it would make no sense to call it. But the code can be easily adapted to make this possible as well, iff required.

Please compare this to function foo() from my other solution which uses Dispatch Groups and Dispatch Queues.

Below an example of the definitions of the async functions which each passing their result to the next one:

func funcA() -> Future<(String, String)> {
    print("Start A")
    let promise = Promise<(String, String)>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 3) {
        print("Complete A")
        promise.complete(("A1", "A2"))
    }
    return promise.future
}

func funcB(param: String) -> Future<String> {
    print("Start B")
    let promise = Promise<String>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
        print("Complete B")
        promise.complete("\(param) -> B")
    }
    return promise.future
}

func funcC(param: String) -> Future<String> {
    print("Start C")
    let promise = Promise<String>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
        print("Complete C")
        promise.complete("\(param) -> C")
    }
    return promise.future
}

func funcD() -> Future<String> {
    print("Start D")
    let promise = Promise<String>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 4) {
        print("Complete D")
        promise.complete("D")
    }
    return promise.future
}

func funcE(param: [String]) -> Future<String> {
    print("Start E")
    let promise = Promise<String>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 4) {
        print("Complete E")
        promise.complete("\(param) -> E")
    }
    return promise.future
}

Which prints this to the console:

Start A Complete A Start B Start C Start D Complete B Complete C Complete D Start E Complete E ["A1 -> B", "A2 -> C", "D"] -> E

Hint: there are a couple of Future and Promise libraries available.

Upvotes: -1

Code Different
Code Different

Reputation: 93191

Edit: the easiest approach is to make the download function synchronous and add a warning to its documentation that it should never be called from the main thread. The pyramid of doom for async function is the reason why coroutines were proposed, by no other than Chris Lattner, Swift's creator. As of April 2018, it's not yet a formal proposal waiting for review so chances are that you won't see it in Swift 5.

An synchronous download function:

// Never call this from main thread
func download(_ something: String, _ seconds: UInt32 = 1, completionHandler: @escaping ()->Void = {}) {
    let group = DispatchGroup()

    print("Downloading \(something)")
    group.enter()
    DispatchQueue.global().async {
        sleep(seconds)
        print("\(something) is downloaded")
        completionHandler()
        group.leave()
    }
    group.wait()
}

And NSOperation / NSOperationQueue setup:

let opA = BlockOperation() {
    self.download("A")
}
let opB = BlockOperation() {
    self.download("B")
}
let opC = BlockOperation() {
    self.download("C")
}
let opD = BlockOperation() {
    self.download("D", 4)
}
let opE = BlockOperation() {
    self.download("E")
}

opB.addDependency(opA)
opC.addDependency(opA)

opE.addDependency(opB)
opE.addDependency(opC)
opE.addDependency(opD)

let operationQueue = OperationQueue()
operationQueue.addOperations([opA, opB, opC, opD, opE], waitUntilFinished: false)

Upvotes: 0

Related Questions