Paul.s
Paul.s

Reputation: 38728

How to get concurrency when using AsyncLines

I'm trying to use AsyncLineSequence with Process to execute many instances of a shell script at the same time. The issue I'm seeing is that with my usage of AsyncLineSequence I'm not seeing the output of the Process invocations interweaved like I would expect. It feels like there is something fundamental I am misunderstanding as this seems like it should work to me.

Here's a reproduction in a playground

import Cocoa

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    exit(EXIT_SUCCESS)
}

func run(label: String) throws {
    let process = Process()
    process.executableURL = URL(fileURLWithPath: "/usr/bin/yes")
    let pipe = Pipe()
    process.standardOutput = pipe

    Task {
        for try await _ in pipe.fileHandleForReading.bytes.lines {
            print(label)
        }
    }

    try process.run()
}

Task {
    try run(label: "a")
}

Task {
    try run(label: "b")
}

The above will print only a or b but never both. If I change to not use AsyncLineSequence like this

import Cocoa

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    exit(EXIT_SUCCESS)
}

func run(label: String) throws {
    let process = Process()
    process.executableURL = URL(fileURLWithPath: "/usr/bin/yes")
    let pipe = Pipe()
    process.standardOutput = pipe

    pipe.fileHandleForReading.readabilityHandler = { _ in
        print(label)
    }

    try process.run()
}

Task {
    try run(label: "a")
}

Task {
    try run(label: "b")
}

The as and bs are both printed interleaved.

To add to my confusion if I use URLSession to get async lines by reading an arbitrary file it does interleave the print statements of a and b as I'd expect

import Cocoa

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    exit(EXIT_SUCCESS)
}

Task {
    for try await _ in try await URLSession.shared.bytes(from: URL(fileURLWithPath: "/usr/bin/yes")).0.lines {
        print("a")
    }
}

Task {
    for try await _ in try await URLSession.shared.bytes(from: URL(fileURLWithPath: "/usr/bin/yes")).0.lines {
        print("b")
    }
}

If I replace URLSession for FileHandle in the above then I am back to no interleaving and all of one file is read followed by the next

import Cocoa

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    exit(EXIT_SUCCESS)
}

Task {
    for try await _ in try FileHandle(forReadingFrom: URL(fileURLWithPath: "/usr/bin/yes")).bytes.lines {
        print("a")
    }
}

Task {
    for try await _ in try FileHandle(forReadingFrom: URL(fileURLWithPath: "/usr/bin/yes")).bytes.lines {
        print("b")
    }
}

Upvotes: 2

Views: 335

Answers (1)

Rob
Rob

Reputation: 437582

When I did this (10 seconds rather than 2 seconds, and in an app rather than a Playground), I do see them jumping back and forth.

Admittedly, it was not one-for-one interleaving (it was lots of “a”s followed by lots of “b”s, and then the process repeats). But there is no reason it would interleave perfectly one-for-one between the two processes, because while lines emits an asynchronous sequence of lines, behind the scenes it is likely reading chunks of output from the pipe, not really consuming it line by line, which would be very inefficient. (And, IMHO, it’s interesting that the URLSession behavior is different, but not terribly surprising.) And you effectively have two processes racing, so there is no reason to expect a graceful, alternating, behavior between the two.

If you replace yes with a program that waits a little between lines of output (e.g., I had it wait for 0.01 seconds between each line of output), then you will see it interleave a bit more frequently. Or when I added an actor to keep track which process last emitted a line of output, that was enough to trigger an immediate back-and-forth processing of one line from each yes output alternatively.

You might also want to consider the implication of running these two loops with Task { ... }, as that will run each “operation asynchronously as part of a new top-level task on behalf of the current actor” [emphasis added]. You might consider detached tasks or separate actors (to reduce contention on the current actor handling both loops). In my tests, it did not change the results too dramatically, but your mileage may vary. Regardless, it is something to be aware of.

Upvotes: 3

Related Questions