Danny
Danny

Reputation: 2683

Implement thread safe deque

As part of a server application to run on macOS, I need to implement a thread-safe deque in order to feed jobs to a thread pool. I'm new to Swift.

In C++ it would look like:

class JobQueue
{
public:
    size_t size()
    {
        mGuard.lock(mMutex);
        return mJobFifo.size()
    }
    
    void push(const Job &job)
    {
        mGuard.lock(mMutex);
        mJobFifo.push_back(job);
    }

    Job pop()
    {
        mGuard.lock(mMutex);
        Job tmp = mJobFifo.front();
        mJobFifo.pop_front();
        return tmp;
    }   
    
private:
    std::deqeue<Job>            mJobFifo;
    std::lock_guard<std::mutex> mGuard;
    std::mutex                  mMutex;
}

In Swift, I've tried the code below but size() always returns zero despite the backing Deque having many elements.

I've read a few articles saying to use mQueue.async(flags: .barrier) to lock exclusive access (like std::lock_guard ??) but the function's return value is not updated.

I also tried putting the return ret inside the .async, but it gave the error "Cannot convert value of type 'Int' to closure result type 'Void'":

    mQueue.async(flags: .barrier)
    {
        ret = self.mJobFifo.count
        print("internal size: \(ret)")
        return ret
    }

Is this the best way to implement such a thread-safe queue? If so, then what am I doing wrong? If not, then how should it be done?

import Foundation
import DequeModule
class OcrJob
{
    static var lastJobId : Int = 0
    var jobId : Int
    
    init()
    {
        jobId = OcrJob.lastJobId
        OcrJob.lastJobId += 1
    }
}

class OcrJobQueue
{
    init(max: UInt)
    { 
        mMaxSize = max
    }
    
    public func size() -> Int
    {
        var ret = 0
        mQueue.async(flags: .barrier)
        {
            ret = self.mJobFifo.count
            print("internal size: \(ret)")
        }
        
        print("size returns \(ret). ")
        return ret
    }
    
    public func pushBack(job: OcrJob) throws
    {
        let curSize = mJobFifo.count
        guard (curSize < mMaxSize) else
        {
            print("pushBack - curSize: \(curSize), mMaxSize \(mMaxSize)")
            throw QueueError.full
        }
        
        mQueue.async(flags: .barrier)
        {
            self.mJobFifo.append(job)
            print("pushBack: \(self.mJobFifo.count) jobs in queue")
        }
    }
    
    public func popFirst() -> OcrJob?
    {
        var ret : OcrJob? = nil
        mQueue.async(flags: .barrier)
        {
            ret = self.mJobFifo.popFirst()
            print("popFirst: \(self.mJobFifo.count) jobs in queue")
        }
        
        return ret
    }
    
    private var mMaxSize : UInt = 10
    private var mQueue = DispatchQueue(label: "JobQueue", attributes: .concurrent)
    private var mJobFifo : Deque<OcrJob> = []
    
}

Upvotes: 0

Views: 80

Answers (1)

Paulw11
Paulw11

Reputation: 114992

Probably the simplest way to solve this is to make use of Swift actors. An actor isolates access to its mutable state.

I would also change OcrJob to be a struct so that it is a value type, and eliminate the static property, because that is more mutable state to be concerned with.

struct OcrJob {

   let jobId = UUID()

}

actor OcrJobQueue {

    let maxSize: UInt
    private var jobFifo = [OcrJob]()

    init(max: UInt = 10)
    { 
        maxSize = max
    }

    public var size: Int 
    {
        return self.jobFifo.count
    }
    
    public func pushBack(job: OcrJob) throws
    {
        guard (jobFifo.count < maxSize) else
        {
            throw QueueError.full
        }
        jobFifo.append(job)
    }
    
    public func popFirst() -> OcrJob?
    {
        guard !self.jobFifo.isEmpty else {
            return nil
        }
        return self.jobFifo.removeFirst()
    }
}

Note that you will now need to interact with your OcrJobQueue in an asynchronous context -

await jobQueue.pushBack(job: someJob)

let nextJob = await jobQueue.popFirst()

Upvotes: 1

Related Questions