devsda
devsda

Reputation: 4232

Doubts in code of multi-threading using ArrayBlockingQueue and mutex

I am trying to write a multithreaded code. But seriously I can't understand from where can I start. My head is banging also. Please help me.

My task is,

  1. There is one queue of length 1, known as pending_tasks, that contains tasks which requires some processing.
  2. There is another queue of length 1, known as completed_tasks, that contains tasks which completes processing., and ready to deliver.

My implementation thinking,

  1. Firstly make two blocking queues, pending_tasks and completed_tasks.
  2. One thread(producer) always listening for tasks that comes from outside, if gets put into pending_tasks.
  3. One thread(consumer) always ready to take tasks from pending_tasks and starts processing , and after that put into into completed_tasks.
  4. Then again comes to pending_tasks, and whenever any tasks come, start the same processing.
  5. Basically, its a single producer-single consumer problem.

My confusion,

I know that it can be code by using ArrayBlockingQueue and Mutex. But I didn't understand how can I start this. I have good understanding of mutex, I read about mutex from this link, and have good understanding of blockingQueue also, as I read lots of questions on this site.

Can you please give me some implementation guidance, so that I can write this multi-threaded code.

I already wrote some code for the same, but that is not achieve the final goal of my task.

Thanks in advance. Looking for your kind reply.

EDIT NO. 1

Please see my below code. This code works fine, but this code has one functionality missing. Please help me to add that, give some guidance to do that.

Functionality is,

  1. When producer thread puts some value in pending_task queue, then it waits for some time there. If in that time consumer gives the result to consumer, then its OK. Otherwise, it says time out, and producer takes another value and pput that in pending_task queue, and the same process starts.

Please help me in adding above functionality. I think we have to communicate between producer thread and consumer thread, and thread communication is done by using Mutex(I think). Please help me implementing the same

My code,

MultiThread Class

package multithread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class MultiThread {

    public static BlockingQueue<Integer> pending_task;
    public static BlockingQueue<Integer> completed_task;

    public MultiThread(int length) {
        pending_task = new ArrayBlockingQueue<Integer>(length, true);
        completed_task = new ArrayBlockingQueue<Integer>(length, true);
    }
}

Producer Class

package multithread;

import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("PRODUCER: Try to put value  " + i + "  in the pending queue");
                MultiThread.pending_task.put(i);
                System.out.println("PRODUCER: Successfully put value  " + i + "  in the pending queue, now its turn to consumer");
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

Consumer Class

package multithread;

import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("CONSUMER: Try to take value from the pending queue");
                int val = MultiThread.pending_task.take();
                System.out.println("CONSUMER:  Successfully take value, and that is   " + val);
                System.out.println("CONSUMER: Processing starts");
                Thread.sleep(1000);
                System.out.println("CONSUMER: Processing ends");
                System.out.println("CONSUMER: Try to put that  that value in  completed queue, and the value is   " + val);
                MultiThread.completed_task.put(val);
                System.out.println("CONSUMER: Successfully put into completed queue");

                //Serve this value to the corresponding user
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }

        }
    }
}

DeliveryBoy Class

package multithread;

import java.util.logging.Level;
import java.util.logging.Logger;

public class DeliveryBoy implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("DELIVERYBOY: Waiting for the value near completed queue");
                int val = MultiThread.completed_task.take();
                System.out.println("DELIVERYBOY:  Succesfully take value from completed queue and the vlue is  " + val);
                //Serve this value to the corresponding user
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }

        }
    }
}

Test Class

package multithread;

public class Test {

    public static void main(String[] args) {
        // TODO code application logic here
        MultiThread ml = new MultiThread(1);
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
        new Thread(new DeliveryBoy()).start();
    }
}

Upvotes: 0

Views: 297

Answers (1)

Cratylus
Cratylus

Reputation: 54094

From ArrayBlockingQueue#put

public void put(E e) throws InterruptedException

Inserts the specified element at the tail of this queue, waiting for **space to become available if the queue is full

From ArrayBlockingQueue#take

public E take() throws InterruptedException

Description copied from interface: BlockingQueue Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

So all you need to do is call these methods from your threads.
Try this (study the javadoc) and when you have a more specific problem you can ask again.

Upvotes: 1

Related Questions