HazirBot
HazirBot

Reputation: 323

Semaphore-implemented Producer-Consumer oriented thread pool

I am currently working on an educational assignment in which i have to implement a Semaphore only thread-safe thread pool.

I mustn't use during my assignment: Synchronize wait notify sleep or any thread-safe API's.

firstly without getting too much into the code i have:

The design itself:

Shared:

Worker Threads:

General Info:

So we have a mix of a single producer, a single manager and several consumers.

My question:

during the app's runtime the function dequeue_worker() returns a null worker, even though a semaphore is placed to protect access to the queue when it is known that there are no available worker threads.

i have "solved" the problem by recursively calling dequeue_worker() if it draws a null thread, BUT doing so is suppose to make an acquisition of a semaphore permit lost forever. yet when i limit the amount of workers to 1 the worker does not get blocked forever.

1) what's the break-point of my original design?

2) why doesn't my "solution" break the design even further?!

Code snippets:

// only gets called by Worker threads: enqueue_worker(this);
    private void enqueue_worker(Worker worker) {
       available_queue.add(worker);
       available.release();
    }

// only gets called by App-Main (a single thread)
    public void enqueue_task(Query query) {
        tasks_queue.add(query);
        tasks.release();
    }

// only gets called by Manager(a single Thread) 
    private Worker dequeue_worker() {
        Worker worker = null;
        try {
            available.acquire();
            worker = available_queue.poll();
        } catch (InterruptedException e) {
            // shouldn't happen
        } // **** the solution: ****
        if (worker==null) worker = dequeue_worker(); // TODO: find out why
        return worker;
    }

// only gets called by Manager(a single Thread) 
    private Query dequeue_task() {
        Query query = null;
        try {
            tasks.acquire();
            query = tasks_queue.poll();
        } catch (InterruptedException e) {
            // shouldn't happen
        } 
        return query;
    }

// gets called by Manager (a single thread)
    private void execute() { // check if task is available and executes it
        Worker worker = dequeue_worker(); // available.down()
        Query query = dequeue_task(); //task.down()
        worker.setData(query);
        worker.blocked.release();
    }

And finally Worker's Run() method:

while (true) { // main infinite loop

                enqueue_worker(this);
                acquire(); // blocked.acquire();
                <C.S>
                available.release();
            }

Upvotes: 1

Views: 374

Answers (1)

hoaz
hoaz

Reputation: 10153

You are calling available.release() twice, once in enqueue_worker, second time in a main loop.

Upvotes: 3

Related Questions