user2890398
user2890398

Reputation: 159

Implementing semaphore by using mutex operations and primitives

Some time ago had an interview and was asked to implement Semaphore by using mutex operations and primitives only (he allowed int to be considered as atomic). I came with solution below. He did not like busy/wait part -- while (count >= size) {} -- and asked to implement locking instead by using more primitive types and mutexes. I did not manage to come with improved solution. Any ideas how it could be done?

struct Semaphore {
int size;
atomic<int> count;
mutex updateMutex;

Semaphore(int n) : size(n) { count.store(0); }

void aquire() {
    while (1) {
        while (count >= size) {}
        updateMutex.lock();
        if (count >= size) {
            updateMutex.unlock();
            continue;
        }
        ++count;
        updateMutex.unlock();
        break;
    }
}

void release() {
    updateMutex.lock();
    if (count > 0) {
        --count;
    } // else log err
    updateMutex.unlock();
}
};

Upvotes: 13

Views: 24608

Answers (7)

Dengzhi Zhang
Dengzhi Zhang

Reputation: 127

Mutex and Condition_Variable are two most fundamental synchronization primitives. Mutex is used to exclusively access resource while Condition_Variable is used to signal. They are defined in C++11. The Semaphore is more advanced synchronization primitive. They are only defined until C++20.

If I were asked to implement semaphore, I would do

// C++20 has provided the semaphore in <semaphore> head
class Semaphore
{
    size_t avail;
    std::mutex mtx;
    std::condition_variable cv;

public:
    Semaphore(int avail = 1) : avail(avail){}

    void acquire() // P(x): named from the Dutch word proberen, which means to test.
    {
        std::unique_lock<std::mutex> lk(mtx);
        cv.wait(lk, [this](){return avail > 0;});
        avail--;
    }

    void release() // V(x): named from the Dutch word verhogen, which means to increment.
    {
        std::unique_lock<std::mutex> lk(mtx);
        avail++;
        cv.notify_one();
    }
};

Upvotes: 1

user3296587
user3296587

Reputation: 151

Who said you could only use one or two mutexes?

class Semaphore {
    private member tickets: Cell with int = Cell(0);
    private member waiting: List with Mutex = List();

    public trywaitfor(): bool {
        for (;;) {
            local n = tickets.value;
            if (n <= 0)
                return false;
            if (tickets.cmpxch(n, n - 1) === n)
                return true;
        }
    }

    public waitfor() {
        for (;;) {
            if (trywaitfor())
                return;
            local m = Mutex();

            /* First acquire won't block */
            m.acquire();

            /* Schedule our mutex as waiting */
            waiting.append(m);

            /* Prevent race condition in case a
             * ticket was posted in the mean-time */
            if (trywaitfor()) {
                if (!waiting.removeif(e -> e === m)) {
                    /* Race condition: our mutex was removed
                     * from the list of waiters by another
                     * thread calling `release' and picking
                     * us (even though we didn't actually end
                     * up waiting). To prevent the loss of a
                     * post-signal, forward said signal to
                     * the next waiting thread. */
                    wakeone();
                }
                return;
            }

            /* Second acquire blocks until released */
            m.acquire();
        }
    }

    private wakeone() {
        local m;
        try {
            m = waiting.popfront();
        } catch (...) {
            /* Catch handler for when popfront()
             * fails because the list is empty */
            return;
        }
        /* Wake up a waiting thread */
        m.release();
    }

    public post() {
        local n;
        /* Add a new ticket */
        do {
            n = tickets.value;
        } while (!tickets.cmpxch(n, n + 1));

        /* Wake up at most 1 waiting thread */
        wakeone();
    }
}

This assumes that:

  • The Mutex type isn't recursive
  • The thread calling Mutex.release() doesn't have to be the same one that called Mutex.acquire() (Mutex implementations that aren't recursive usually allow this as well, since disallowing it would already require identifying the thread that did the acquire, which is wholly unnecessary if you don't want to be recursive)
    • Yes: I know that means that it's not technically a "Mutex", but rather a "Lock", but come on: pleanty of people use those two terms interchangably.
  • All operations on the Cell (an atomic object container) and List (it's a list, duh) types are atomic (but even if they're weren't, you can just add another mutex around all calls to tickets and waiting)

I guess your interviewer meant lists/vectors/arrays when he was talking about "more primitive types"

Upvotes: 0

Ahmed Elbadawy
Ahmed Elbadawy

Reputation: 71

That's true because technically there are some parts in your code that have no need to exist. 1- you used atomic datatypes atomic<int> count; which will take very few more cycles in execution and it is useless as long as incrementing and decrementing are locked by updateMutex.lock(); code so there is no other thread can change it during the locked state.

2- you put while (count >= size) {} which is also useless because you checked count again after the spinlock statement which is necessary and the one important here. "remember spinlock is a while(1)" when the mutex is taken by another thread.

besides if you decided to use int count; with some compiler's optimizations, maybe your code won't re-read count value!! for optimization, remember your semaphore is supposed to be used by different threads!! so you need to make it volatile, to avoid this problem.

at last, let me rewrite your code in a more performant way.

struct Semaphore {
int size;
volatile int count;
mutex updateMutex;

Semaphore(int n) : size(n), count(0) {}

void aquire() {
    while (1) {
        updateMutex.lock();
        if (count >= size) {
            updateMutex.unlock();
            continue;
        }
        ++count;
        updateMutex.unlock();
        break;
    }
}

void release() {
    updateMutex.lock();
    if (count > 0) {
        --count;
    } // else log err
    updateMutex.unlock();
    }

 };

Upvotes: 1

XYZ
XYZ

Reputation: 189

lemme try this

`

# number of threads/workers
w = 10
# maximum concurrency
cr = 5
r_mutex = mutex()
w_mutex = [mutex() for x in range(w)]

# assuming mutex can be locked and unlocked by anyone 
# (essentially we need a binary semaphore)

def acquire(id):

    r_mutex.lock()
    cr -= 1
    # r_mutex.unlock()
    
    # if exceeding maximum concurrency
    if cr < 0:
        # lock twice to be waken up by someone
        w_mutex[id].lock()
        r_mutex.unlock()
        w_mutex[id].lock()
        w_mutex[id].unlock()
        return

    r_mutex.unlock()

    
    
       
def release(id):

    r_mutex.lock()
    cr += 1
    # someone must be waiting if cr < 0
    if cr <= 0:
        # maybe you can do this in a random order
        for w in w_mutex:
            if w.is_locked():
                w.unlock()
                break
    r_mutex.unlock()

`

Upvotes: -2

dekuShrub
dekuShrub

Reputation: 486

EDIT - use a second mutex for queuing intstead of threads

Since a mutex already have proper thread-support, it can be used to queue the threads (instead of doing it explicitly as I first had tried to do). Unless the mutex is restricted to only let the owner unlock it (a lock?), then this solution doesn't work.

I found the solution in Anthony Howe's pdf that I came across when searching. There are two more solutions given there as well. I changed the names to make more sense for this example.

more or less pseudo code:

Semaphore{
    int n;
    mutex* m_count;         //unlocked initially
    mutex* m_queue;         //locked initially
};

void wait(){
    m_count.lock();
    n = n-1;
    if(n < 0){
        m_count.unlock();
        m_queue.lock();     //wait
    }
    m_count.unlock();       //unlock signal's lock
}

void signal(){
    m_count.lock();
    n = n+1;

    if(n <= 0){
        m_queue.unlock();   //leave m_count locked
    }
    else{
        m_count.unlock();
    }
}

Upvotes: -1

Joerg Simon
Joerg Simon

Reputation: 421

as @chill pointet out, the solution I did write down here will not work, as locks have unique ownership. I guess in the end you will revert to busy wait (if you are not allowed to use condition variables). I leave it here if ppl. have the same idea they see that this DOES NOT WORK ;)

struct Semaphore {
int size;
atomic<int> count;
mutex protection;
mutex wait;

Semaphore(int n) : size(n) { count.store(0); }

void aquire() {
    protection.lock();
    --count;
    if (count < -1) {
        protection.unlock();
        wait.lock();
    }
    protection.unlock();
}

void release() {
    protection.lock();
    ++count;
    if (count > 0) {
        wait.unlock();
    }
    protection.unlock();
}
};

Upvotes: 0

chill
chill

Reputation: 16888

I'd wager this is not possible to implement without a busy-loop using mutexes only.

If not busy-looping, you have to block somewhere. The only blocking primitive you've got is a mutex. Hence, you have to block on some mutex, when the semaphore counter is zero. You can be woken up only by the single owner of that mutex. However, you should woken up whenever an arbitrary thread returns a counter to the semaphore.

Now, if you are allowed condition variables, it's an entirely different story.

Upvotes: 10

Related Questions