Jan Nash
Jan Nash

Reputation: 1962

Creating a threadsafe Array, the easy way?

I've just read a post by Basem Emara about creating a threadsafe array Type in Swift. While I glanced through the code example, I asked myself if there isn't a way to achieve this with quite less code.

Suppose I create this class:

// MARK: Class Declaration
class ThreadsafeArray<Element> {
    // Private Variables
    private var __array: [Element] = []
    private var __arrayQueue: DispatchQueue = DispatchQueue(
        label: "ThreadsafeArray.__concurrentArrayQueue",
        attributes: .concurrent
    )
}


// MARK: Interface
extension ThreadSafeArray {
    // ReadWrite Variables
    var threadsafe: [Element] {
        get {
            return self.__arrayQueue.sync {
                return self.__array
            }
        }
        set(newArray) {
            self.__arrayQueue.async(flags: .barrier) {
                self.__array = newArray
            }
        }
    }
}

If, from now on, I only accessed the actual array through .threadsafe, would this suffice to make the array threadsafe?

Also, could I implement it a struct instead of a class to get the mutating checks as well?

I am aware that the objects inside this array would not be threadsafe themselves through this but this is not the point, so let's assume I only put threadsafe stuff in there.

(Of course, to avoid the calls to .threadsafe, I would make the shiny new class conform to ExpressibleByArrayLiteral, Collection and RangeReplaceableCollection, so I can use it like a normal array.


Edit

Meanwhile, I've tried testing it in a playground and have come to believe that it doesn't suffice.

Playground code:

import Foundation
import PlaygroundSupport


PlaygroundPage.current.needsIndefiniteExecution = true

// Testing //
// Thread-unsafe array
func unsafeArray() {
    var array: [Int] = []
    var iterations: Int = 1000
    let start: TimeInterval = Date().timeIntervalSince1970

    DispatchQueue.concurrentPerform(iterations: iterations) { index in
        let last: Int = array.last ?? 0
        array.append(last + 1)

        DispatchQueue.global().sync {
            iterations -= 1

            // Final loop
            guard iterations <= 0 else { return }
            print(String(
                format: "Unsafe loop took %.3f seconds, count: %d.",
                Date().timeIntervalSince1970 - start, array.count
            ))
        }
    }
}

// Thread-safe array
func safeArray() {
    let array: ThreadsafeArray<Int> = ThreadsafeArray<Int>()
    var iterations: Int = 1000
    let start: TimeInterval = Date().timeIntervalSince1970

    DispatchQueue.concurrentPerform(iterations: iterations) { index in
        let last: Int = array.threadsafe.last ?? 0
        array.threadsafe.append(last + 1)

        DispatchQueue.global().sync {
            iterations -= 1

            // Final loop
            guard iterations <= 0 else { return }
            print(String(
                format: "Safe loop took %.3f seconds, count: %d.",
                Date().timeIntervalSince1970 - start, array.threadsafe.count
            ))
        }
    }
}

unsafeArray()
safeArray()

Output:

Most of the time:

experiments(31117,0x7000038d0000) malloc: *** error for object 0x11f663d28: pointer being freed was not allocated

*** set a breakpoint in malloc_error_break to debug

Sometimes:

IndexError: Index out of range

Unfortunately also:

Unsafe loop took 1.916 seconds, count: 994.
Safe loop took 11.258 seconds, count: 515.

Doesn't seem to suffice (also, it's incredibly unperformant).

Upvotes: 2

Views: 2795

Answers (3)

Rob
Rob

Reputation: 437682

The synchronization mechanism in your question, with concurrent queue and judicious use of barrier is known as the “reader-writer” pattern. In short, it offers concurrent synchronous reads and non-concurrent asynchronous writes. This is a fine synchronization mechanism. It is not the problem here.

But there are a few problems:

  1. In the attempt to pare back the implementation, this class has become very inefficient. Consider:

    class ThreadSafeArray<Element> {
        private var array: [Element]
        private let queue = DispatchQueue(label: "ThreadsafeArray.reader-writer", attributes: .concurrent)
    
        init(_ array: [Element] = []) {
            self.array = array
        }
    }
    
    extension ThreadSafeArray {
        var threadsafe: [Element] {
            get { queue.sync { array } }
            set { queue.async(flags: .barrier) { self.array = newValue } }
        }
    }
    
    let numbers = ThreadSafeArray([1, 2, 3])
    numbers.threadsafe[1] = 42                // !!!
    

    What that numbers.threadsafe[1] = 42 line is really doing is as follows:

    • Fetching the whole array;
    • Changing the second item in a copy of the array; and
    • Replacing the whole array with a copy of the array that was just created.

    That is obviously very inefficient.

  2. The intuitive solution is to add an efficient subscript operator in the implementation:

    extension ThreadSafeArray {
        typealias Index = Int
    
        subscript(index: Index) -> Element {
            get { queue.sync { array[index] } }
            set { queue.async(flags: .barrier) { self.array[index] = newValue} }
        }
    }
    

    Then you can do:

    numbers[1] = 42
    

    That will perform a synchronized update of the existing array “in place”, without needing to copy the array at all. In short, it is an efficient, thread-safe mechanism.

    What will end up happening, as one adds more and more basic “array” functionality (e.g., especially mutable methods such as the removing of items, adding items, etc.), you end up with an implementation not dissimilar to the original implementation you found online. This is why that article you referenced implemented all of those methods: It exposes array-like functionality, but offering an efficient and (seemingly) thread-safe interface.

  3. While the above addresses the data races, there is a deep problem in that code sample you found online, as illuminated by your thread-safety test.

    To illustrate this, let’s first assume we flesh out our ThreadSafeArray to have last, append() and make it print-able:

    class ThreadSafeArray<Element> {
        private var array: [Element]
        private let queue = DispatchQueue(label: "ThreadsafeArray.reader-writer", attributes: .concurrent)
    
        init(_ array: [Element] = []) {
            self.array = array
        }
    }
    
    extension ThreadSafeArray {
        typealias Index = Int
    
        subscript(index: Index) -> Element {
            get { queue.sync { array[index] } }
            set { queue.async(flags: .barrier) { self.array[index] = newValue} }
        }
    
        var last: Element? {
            queue.sync { array.last }
        }
    
        func append(_ newElement: Element) {
            queue.async(flags: .barrier) {
                self.array.append(newElement)
            }
        }
    }
    
    extension ThreadSafeArray: CustomStringConvertible {
        var description: String {
            queue.sync { array.description }
        }
    }
    

    That implementation (a simplified version of the rendition found on that web site) looks OK, as it solves the data race and avoids unnecessary copying of the array. But it has its own problems. Consider this rendition of your thread-safety test:

    let numbers = ThreadSafeArray([0])
    
    DispatchQueue.concurrentPerform(iterations: 1_000) { <#Int#> in
        let lastValue = numbers.last! + 1
        numbers.append(lastValue)
    }
    
    print(numbers)                   // !!!
    

    The strict data race is solved, but the result will not be [0, 1, 2, ..., 1000]. The problem are the lines:

    let lastValue = numbers.last! + 1
    numbers.append(lastValue)
    

    That does a synchronized retrieval of last followed by a separate synchronized append. The problem is that another thread might slip in between these two synchronized calls and fetch the same last value! You need to wrap the whole “fetch last value, increment it, and append this new value” in a single, synchronized task.

    To solve this, we would often give the thread-safe object a method that would provide a way to perform multiple statements as a single, synchronized, task. E.g.:

    extension ThreadSafeArray {
        func synchronized(block: @escaping (inout [Element]) -> Void) {
            queue.async(flags: .barrier) { [self] in
                block(&array)
            }
        }
    }
    

    Then you can do:

    let numbers = ThreadSafeArray([0])
    
    DispatchQueue.concurrentPerform(iterations: 1_000) { <#Int#> in
        numbers.synchronized { array in
            let lastValue = array.last! + 1
            array.append(lastValue)
        }
    }
    
    print(numbers)                  // OK
    
  4. So let’s return to your intuition that the author’s class can be simplified. You are right, that it can and should be simplified. But my rationale is slightly different than yours.

    The complexity of the implementation is not my concern. It actually is an interesting pedagogical exercise to understand barriers and the broader reader-writer pattern.

    My concern is that (to my point 3, above), is that the author’s implementation lulls an application developer in a false sense of security provided by the low-level thread-safety. As your tests demonstrate, a higher-level level of synchronization is almost always needed.

    In short, I would stick to a very basic implementation, one that exposes the appropriate high-level, thread-safe interface, not a method-by-method and property-by-property interface to the underlying array, which almost always will be insufficient. In fact, this desire for a high-level, thread-safe interface is a motivating idea behind a more modern thread-safety mechanism, namely actors in Swift concurrency.

Upvotes: 2

jeh
jeh

Reputation: 2443

I suspect this line is your issue:

DispatchQueue.global().sync { ...

If you specify one serial queue you want to use here you should get the result you want.

Something like:

let array = SynchronizedArray<Int>()
var iterations = 1000
let queue = DispatchQueue(label: "queue")

DispatchQueue.concurrentPerform(iterations: 1000) { index in
    array.append(array.last ?? 0)

    queue.sync {
        iterations -= 1

        if iterations == 0 {
            print(array.count)
        }
    }
}

Upvotes: 0

Jake G
Jake G

Reputation: 1195

Another method of locking objects is:

func lock(obj: AnyObject, work:() -> ()) {
    objc_sync_enter(obj)
    work()
    objc_sync_exit(obj)
}

Could your class use this to lock its standard array when needed?

Upvotes: -1

Related Questions