Christopher Francisco
Christopher Francisco

Reputation: 16278

Synchronizing queue in Java on multiple threads

I understand the concept of synchronizing, but I'm now sure about why is it implemented that way, so I need a little help in here:

I have 2 threads:

PeriodicalThread will receive data periodically (let's say each 5 seconds) and put it in a Queue (using ArrayDeque for the moment, but I don't know if any other Queue implementation will work better)

ProccessThread will constantly check on the Queue to see if it is empty. If it is not empty, it will process the data (FIFO).

So, at first my implementation would be:

// Both threads are inner class so they have access to Queue

private Queue queue;
private boolean isReadyToProccess;


class PeriodicalThread extends Thread {
    public void run() {
        while(true) {
           if(isNewDataAvailable) {
                // create Data object
                queue.add(data);
           }
        }
    }
}

class ProcessThread extends Thread {
    public void run() {
        while(true) {
           if(!queue.isEmpty() && isReadyToProccess) {
               Data data = queue.poll();
               processData(data);
           }
        }
    }
}

private void processData(Data data) {
    // this method send data over network, and the server response callback
    // changes isReadyToProcess value to true.
}

Then when wanting to handle synchronization, I wouldn't know If I should use a lock object (and how is it implemented) or if there is already a package Queue implementation that is thread-safe (because of the add() and poll() methods)

Edit: I forgot about the flag isReadyToProcess indicating next queue Data object is... well, ready to be proccessed. This flag should be synchronized also.

Upvotes: 7

Views: 10924

Answers (6)

bekce
bekce

Reputation: 4310

Apache Common Collections library includes a SynchronizedQueue class that can make any custom Queue implementation thread-safe.

SynchronizedQueue.synchronizedQueue(new MyAwesomeNonThreadSafeQueue()

Upvotes: 0

vach
vach

Reputation: 11377

You can use java Blocking Queue or Collections.synchronizedList(new LinkedList<>()), I like Collections.synchronized... as it is effectively making your collection thread safe and you are not bothering about locks and flags and similar stuff... just writing logical readable code.

LinkedList will provide you addFirst() addLast() and ofcourse getFirst(), getLast() to implement FIFO or LIFO behaviour...

Upvotes: 0

M Sach
M Sach

Reputation: 34424

You are looking for Blocking Queue implementation

This provides the functionality out of box which you are looking for. Thats why its best suited for producer consumer examples.

Here is an example

public class BlockingQueueExample {

    public static void main(String[] args) throws Exception {

        BlockingQueue queue = new ArrayBlockingQueue(1024);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();

        Thread.sleep(4000);
    }
}


public class Producer implements Runnable{

    protected BlockingQueue queue = null;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


public class Consumer implements Runnable{

    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Upvotes: 4

Andres
Andres

Reputation: 10717

Before Java 5 you'd have to implement a wait/notify mechanism. Since Java 5 you can use an implementation of the BlockingQueue interface for producer/consumer problems.

Take a look at this:

http://www.javamex.com/tutorials/synchronization_producer_consumer.shtml

Upvotes: 1

Luiggi Mendoza
Luiggi Mendoza

Reputation: 85779

ArrayDeque doesn't support concurrency. Instead, use a real queue that supports concurrent work like BlockingQueue and one of its implementations in java.util.concurrent package. I recommend using LinkedBlockingQueue.

In case you need to share flags between your threads, it would be better using AtomicBoolean instead of manually synchronizing a primitive boolean field.

Note: if you will work with concurrent process, it is better to work with classes provided by java.util.concurrent package that already support lock and synchronization out-of-the-box.

Upvotes: 7

Arash Saidi
Arash Saidi

Reputation: 2238

Look at ArrayBlockingQueue and another BlockingQueue implementations.

From documentation:

A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

An alternative is also the ConcurrentLinkedQueue.

You can read more about concurrent Queues here:

http://docs.oracle.com/javase/tutorial/collections/implementations/queue.html

Upvotes: 1

Related Questions