hpet
hpet

Reputation: 299

Setup reader, decoder and consumer threads

I have a rather standard producer and consumer threads:

Happenes that decoding process is a bottle neck and would probably benefit from having more CPU. It is 70% of producer time. Would I gain any noticable performance if I introduce "decoder" thread?

I need to use one queue due to memory foot print - can't afford having two queues (bytes/items), so I guess object "casting" overhead will be present?

Any idea on how to implement this 3 threaded solution?

Thank you!

Upvotes: 1

Views: 201

Answers (2)

necromancer
necromancer

Reputation: 24651

2 queues, one to hold undecoded objects from which multiple consumers decode.

the multiple consumers would decode and write decoded objects to the second queue, from which ultimate consumers would consume.

make sure to avoid deadlock (use notifyAll() not notify() unless you really know what you are doing)

Upvotes: 0

Scorpion
Scorpion

Reputation: 3986

You should tune the thread pools for the producer and consumer - for example if the consumer is way too fast vis-a-vis the producer it's thread pool could be allocated fewer threads than producer's thread pool. This should then lead to a significant increase in throughput. The ratio of producer to consumer threads should be tuned (example 3:1).

On similar lines, you could have three thread pools in which Producer (Reader) and Consumer have lesser threads, while the decoder (transformer) thread pool has a higher number of threads. I am not sure if you need code examples in which case you should share what you currently have. I would start with thread pools of size 1 for Producer and Consumer and size 5 for transformer (decoder) and then measure what the bottleneck is (and if then the throughput meets your expectations)

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ProducerDecoderConsumer {
    /**
     * @param args
     */
    public static void main(String[] args) {
        BlockingQueue<Integer> inputQueue = new PriorityBlockingQueue<Integer>();
        BlockingQueue<String> outputQueue = new PriorityBlockingQueue<String>();
        ExecutorService reader = Executors.newSingleThreadExecutor();
        reader.submit(new Producer(inputQueue));
        ExecutorService decoder = Executors.newFixedThreadPool(5);
        decoder.submit(new Transformer(inputQueue, outputQueue));
        ExecutorService writer = Executors.newSingleThreadExecutor();
        writer.submit(new Consumer(outputQueue));

    }

    private static class Producer implements Callable<Void> {
        final BlockingQueue<Integer> queue;

        public Producer(final BlockingQueue<Integer> pQueue) {
            queue = pQueue;
        }

        @Override
        public Void call() throws Exception {
            try {
                Random random = new Random();
                while (true) {
                    queue.put(random.nextInt());
                }
            } catch (Exception e) {

            }
            return null;
        }
    }

    private static class Transformer implements Callable<Void> {
        final BlockingQueue<Integer> inputQueue;

        final BlockingQueue<String> outputQueue;

        public Transformer(final BlockingQueue<Integer> pInputQueue, final BlockingQueue<String> pOutputQueue) {
            inputQueue = pInputQueue;
            outputQueue = pOutputQueue;
        }

        @Override
        public Void call() throws Exception {
            try {
                while (true) {
                    Integer input = inputQueue.take();
                    String output = String.valueOf(input); // decode input to output
                    outputQueue.put(output); // output
                }
            } catch (Exception e) {

            }
            return null;
        }
    }

    private static class Consumer implements Callable<Void> {
        final BlockingQueue<String> queue;

        public Consumer(final BlockingQueue<String> pQueue) {
            queue = pQueue;
        }

        @Override
        public Void call() throws Exception {
            try {
                while (true) {
                    System.out.println(queue.take());
                }
            } catch (Exception e) {

            }
            return null;
        }
    }
}

I have added some code to illustrate the idea - I am using two blocking queue unlike the single queue mentioned in your question as I don't think there would be an overhead of just having the extra queue - I would suggest the use of a profiler to demonstrate such a thing. However, I hope you find it useful and can retrofit it to the single queue model should you really feel the need.

Upvotes: 0

Related Questions