beta
beta

Reputation: 5686

Access Queue from two separate Threads in parallel

So my goal is to measure the performance of a Streaming Engine. It's basically a library to which i can send data-packages. The idea to measure this is to generate data, put it into a Queue and let the Streaming Engine grab the data and process it.

I thought of implementing it like this: The Data Generator runs in a thread and generates data packages in an endless loop with a certain Thread.sleep(X) at the end. When doing the tests the idea is to minimize tis Thread.sleep(X) to see if this has an impact on the Streaming Engine's performance. The Data Generator writes the created packages into a queue, that is, a ConcurrentLinkedQueue, which at the same time is a Singleton.

In another thread I instantiate the Streaming Engine which continuously removes the packages from the queue by doing queue.remove(). This is done in an endlees loop without any sleeping, because it should just be done as fast as possible.

In a first try to implement this I ran into a problem. It seems as if the Data Generator is not able to put the packages into the Queue as it should be. It is doing that too slow. My suspicion is that the endless loop of the Streaming Engine thread is eating up all the resources and therefore slows down everything else.

I would be happy about how to approach this issue or other design patterns, which could solve this issue elegantly.

the requirements are: 2 threads which run in parallel basically. one is putting data into a queue. the other one is reading/removing from the queue. and i want to measure the size of the queue regularly in order to know if the engine which is reading/removing from the queue is fast enough to process the generated packages.

Upvotes: 1

Views: 1877

Answers (1)

PeterK
PeterK

Reputation: 1723

You can use a BlockingQueue, for example ArrayBlockingQueue, you can initialize these to a certain size, so the number of items queued will never exceed a certain number, as per this example:

// create queue, max size 100
final ArrayBlockingQueue<String> strings = new ArrayBlockingQueue<>(100);
final String stop = "STOP";

// start producing
Runnable producer = new Runnable() {
  @Override
  public void run() {
    try {
      for(int i = 0; i < 1000; i++) {
        strings.put(Integer.toHexString(i));
      }
      strings.put(stop);
    } catch(InterruptedException ignore) {
    }
  }
};

Thread producerThread = new Thread(producer);
producerThread.start();

// start monitoring
Runnable monitor = new Runnable() {
  @Override
  public void run() {
    try {
      while (true){
        System.out.println("Queue size: " + strings.size());
        Thread.sleep(5);
      }
    } catch(InterruptedException ignore) {
    }
  }
};
Thread monitorThread = new Thread(monitor);
monitorThread.start();

// start consuming
Runnable consumer = new Runnable() {
  @Override
  public void run() {
    // infinite look, will interrupt thread when complete
    try {
      while(true) {
        String value = strings.take();
        if(value.equals(stop)){
          return;
        }
        System.out.println(value);
      }
    } catch(InterruptedException ignore) {
    }
  }
};

Thread consumerThread = new Thread(consumer);
consumerThread.start();

// wait for producer to finish
producerThread.join();
consumerThread.join();

// interrupt consumer and monitor
monitorThread.interrupt();

You could also have third thread monitoring the size of the queue, to give you an idea of which thread is outpacing the other.

Also, you can used the timed put method and the timed or untimed offer methods, which will give you more control of what to do if the queue if full or empty. In the above example execution will stop until there is space for the next element or if there are no further elements in the queue.

Upvotes: 1

Related Questions