Laxmikant
Laxmikant

Reputation: 1611

What is the best queue cache to use in concurrent environment in java

I have a requirement to have a queue cache of messages in multi threaded environment where thousands of requests are coming per seconds.

Each request thread should pop out the message from queue (AND NO THREAD SHOULD GET DUPLICATE MESSAGE) if queue is not empty + decrements a counter in db .And if the queue is empty the thread will fetch a fixed number of message (for example 100) from DB and fill the queue cache (other thread should wait while the queue is filling) and then pop-out one message + decrements a counter in db and return.

The poping out and decrementing counter in db should be in synch to avoid any inconsistency.

So from the requirement it is clear that cache will have more read and remove (pop-out) operation however less write operation (only when the cache is empty).

Right now i have a synchronized method getMessage which has a ArrayList in it and I do the above operation (pop-out if empty else fetch and then pop-out) in in this method but i have obviously facing a lot of contention issue.

If at read/remove time, concurrent threads get different locks and at the time of write, the lock should be on whole cache then this will reduce my contentions issue.

Which java cache should be best in this case? In load i am facing low performance due to this. Kindly give me some better idea.

Upvotes: 0

Views: 4756

Answers (2)

Sandeep Salian
Sandeep Salian

Reputation: 441

Instead of Synchronize you can look into ReentrantLocks. For your requirement there is an ReentrantReadWriteLock Class in Java. You can refer the Java docs for details. Basically an ReentrantReadWriteLock is recommended in cases where data is accessed by more reader threads than writer threads.

As per this implementation, multiple threads can read the same resource without locking. But a single write operation to the resource locks it and no other reads nor writes are allowed at the same time.

There are many sample examples available online which you can refer to implement ReentrantReadWriteLock.

Sample:

package concurrency.reentrantreadwrite;

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReentrantReadWrite {
    public static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
    public static StringBuffer message = new StringBuffer("a");

    public static void main(String[] args) throws InterruptedException{
    Thread t1 = new Thread(new Reader(lock, message));
    Thread t2 = new Thread(new WriterA(lock, message));
    Thread t3 = new Thread(new WriterB(lock, message));
    t1.start();
    t2.start();
    t3.start();
    t1.join();
    t2.join();
    t3.join();
    }
}


package concurrency.reentrantreadwrite;

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Reader implements Runnable 
{
    ReentrantReadWriteLock lock = null;
    StringBuffer message = null;

    public Reader(ReentrantReadWriteLock lock, StringBuffer message) {
    this.lock = lock;
    this.message = message;
    }

    public void run() 
    {
    for(int i = 0; i<= 10; i ++) 
    {
        if(lock.isWriteLocked()) {
        System.out.println("I'll take the lock from Write");
        }
        lock.readLock().lock();
        System.out.println("ReadThread " + Thread.currentThread().getId() + " ---> Message is " + message.toString() );
        lock.readLock().unlock();
    }
    }
}


package concurrency.reentrantreadwrite;

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class WriterA implements Runnable 
{
    ReentrantReadWriteLock lock = null;
    StringBuffer message = null;

    public WriterA(ReentrantReadWriteLock lock, StringBuffer message) {
    this.lock = lock;
    this.message = message;
    }

    public void run() 
    {
    for(int i = 0; i<= 10; i ++) 
    {
        try {
        lock.writeLock().lock();
        message.append("a");
        }finally {
        lock.writeLock().unlock();
        }
    }
    }
}

package concurrency.reentrantreadwrite;

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class WriterB implements Runnable 
{
    ReentrantReadWriteLock lock = null;
    StringBuffer message = null;

    public WriterB(ReentrantReadWriteLock lock, StringBuffer message) {
    this.lock = lock;
    this.message = message;
    }

    public void run() 
    {
    for(int i = 0; i<= 10; i ++) 
    {
        try {
        lock.writeLock().lock();
        message.append("b");
        }finally {
        lock.writeLock().unlock();
        }
    }
    }
}

Upvotes: 4

LoganMzz
LoganMzz

Reputation: 1623

Summary

When you're facing an architecture problem, you should extract a "model" and abstract any external part/systems with interfaces. Thus, you can easily start by making those interfaces waiting a fixed time, then randomly, next with sampled durations and so forth.

It makes you clearer how your algorithm/system works and make very easy to refactor/adapt until the "model" is satisfactory. As your model become clearer, it also make you easier to insert monitoring and show what/where you have to monitor. I can add more complete explanation if necessary.

From your problem, you should add better separation between "cache consuming" and "cache producing". You also have to use non-blocking algorithm and smoother push to the "cache". Note that "non-blocking" doesn't mean that yours threads will never starves ; if there's nothing to process, they will necessary be sleepy. However, they shouldn't block each other.

External systems

As mentionned on the summary, external systems must be abstracted and default value/spent time must be used. Thus you can use the same scenarii/model but changing the way of stubbing.

Storage abstraction:

interface Storage<T> {
  /**
   * Fetch next messages.
   * @param target    Target collection to collect data.
   * @param fetchSize Maximum record to fetch.
   * @param Actually fetch messsages.
   */
  int drainTo(Collection<T> target, int fetchSize);
  /**
   * Update message.
   */
  void update(T message);
}

For message processing we will abstract it to Java 8 SE Consumer.

Model

Here is an implementation from what I have understood from your question and related comments:

class Model<T>  {
  private Storage<T>  storage;
  private int         fetchSize;
  private Consumer<T> processor;

  /**
   * while active:
   *    Try to poll a message from cache
   *    if none:
   *      Fetch from storage
   *      Put fetched messages to cache
   *    else:
   *      Process message
   *      Update storage according to processed message
   **/
  public void consume(BooleanSupplier active) {
    try {
      while (active.getAsBoolean()) {
        T message = cache.poll();
        if (message == null) {
          // Fetch new messages from storage
          List<T> newMessages = new ArrayList<>(fetchSize);
          synchronized (storage) {
            storage.drainTo(newMessages, fetchSize);
          }
          // Push new messages to cache
          for (T newMessage : newMessages) {
            cache.put(newMessage);
          }
        } else {
          processor.accept(message);
          storage.update(message);
        }
      }
    } catch (InterruptedException e) {
      if (!active.getAsBoolean()) {
        throw new RuntimeException(e);
      }
    }
  }
}

Static analysis

  • If there's nothing to fetch, all threads will eagerly try to fetch storage.
  • When cache is empty, all threads (except the first one) are queued on acquiring storage access. Making them to cumulative wait after each other.
  • Worst, as cache.put(T) is a blocking operation and all other consumer threads might been waiting for storage access a deadlock will occur.
  • All threads are doing the same job whatever system state is. For example fetching storage when cache is empty. Use non-blocking algorithm to make only one fetching the database and the other ones waiting from cache.
  • Cache transfer must be smoother pushing messages as long as they are fetch from database.
  • More pushing to cache mustn't block. When cache is full, storage fetching must be aborted. Then, thread can return in "cache consumer" instead of starving on "blocked cache producer".
  • Waiting for cache exhaustion for fetching messages from storage adds bad latency. Because while one thread is starting to fetch from storage, all other ones are starving and are eagerly fighting to grab the very first cached messages.

Dynamic run

I have written a small system to take measure. I have used the following parameters:

  • storage.drainTo is blocking for 33ms
    • fetch size of 100
    • fetch throughput at 3k/s
  • storage.update is blocking for 5ms
  • I run the code for 10 seconds and interrupt all consumer threads
  • I used 4 threads for 4 processors (Intel Core i5-2500K @ 3.3GHz)

My observations:

  • storage.drainTo
    • Called (per thread): 18 ; 17 ; 18 ; 18 (total=71)
    • Duration (per thread): 1,161ms ; 2,228ms ; 1,159ms ; 1,157ms (total=5,706ms)
    • 7100 messages have been virtually fetch from storage
    • Target call for 17-18 times is 561-594ms
    • Starvation per thread is: min=563ms ; max=1,667ms ; average=841ms ; sum=3,363ms
    • Starvation represents 8.41% of execution time 3,363ms / (4 x 10s)
    • Fetch represents 14.26% of execution time
  • storage.update
    • Called (per thread): 1,766 ; 1,530 ; 1,766 ; 1,766 (total=6,828)
    • Duration (per thread): 8,807ms ; 7,631ms ; 8,809ms ; 8,807ms (total=34,054ms)
    • Update represents 85.14% of execution time

99.4% of execution time is spent on storage management. There's no issue with Java queues.

Improvement

Based on my analysis, I suggest this version:

class ImprovedModel<T>  {
  private Storage<T>  storage;
  private int         fetchSize;
  private Consumer<T> processor;
  private StampedLock lock = new StampedLock();

  /**
   * while active:
   *    Try to poll a message from cache
   *    if none:
   *      if storage available:
   *        Transfer from storage to cache
   *      else:
   *        Take message from cache
   *    if message:
   *      Process message
   *      Update storage according to processed message
   **/
  public void consume(BooleanSupplier active) {
    try {
      while (active.getAsBoolean()) {
        T message = cache.poll();
        if (message == null) {
          // Check storage availability
          int stamp = lock.tryWriteLock();
          try {
            if (stamp != 0) {
              storage.drainTo(cache, fetchSize);
            } else {
              message = cache.take();
            }
          } finally {
            lock.unlockWrite(stamp);
          }
        }
        if (message != null) {
          processor.accept(message);
          storage.update(message);
        }
      }
    } catch (InterruptedException e) {
      if (!active.getAsBoolean()) {
        throw new RuntimeException(e);
      }
    }
  }
}

Notes:

  • Lock mechanism relies on StampedLock. You may try other implementations but due to low cache miss it might not have meaningful impact.
  • When both queues are empty only one thread will aggressively loop on storage fetch. Others will just be paused. Don't consuming CPU at all.
  • To be performant storage API must:
    • first fetch a small quantity to quickly deliver messages to waiting "cache consumer"
    • use offer method in order to terminate on failure (cache full). Thus, "cache producer" can quickly returns to "cache consumer" state to help others to consume messages.
  • Based on message processing+update VS storge fetch throughput, it may be a good idea to set fetch size near to cache limit.
  • Ideally, you might add monitoring+heuristic system to dynamically adapt fetch size and storage fetch triggering (instead of waiting for cache emptiness). Simpler you can just add a thread which will periodically checks for new messages, optionally waken up by a "cache consumer" in case of empty cache.

New observations:

  • storage.drainTo
    • Called (per thread): 5 ; 30 ; 29 ; 10 (total=74)
    • Duration (per thread): 164ms ; 990ms ; 957ms ; 330ms (total=2,441ms ; -57.21%)
    • 7400 (+4.23%) messages have been virtually fetch from storage
    • Fetch represents 6.10% of execution time
  • cache.take
    • Called (per thread): 69 ; 44 ; 45 ; 64 (total=222)
    • Duration (per thread): 106ms ; 199ms ; 199ms ; 348ms (total=852ms)
    • Take represents 2.13% of execution time
  • storage.update
    • Called (per thread): 1,947 ; 1,761 ; 1,768 ; 1,863 (total=7,339 ; +7,48%)
    • Duration (per thread): 9,698ms ; 8,778ms ; 8,813ms ; 9,286ms (total=36,574ms)
    • Update represents 91.44% of execution time

You may deduce cache.take starvation by comparing new cache.poll+cache.take and old cache.poll throughput.

Appendix

  • You can see my simple/poor monitoring I have used to measure and export results at A simple implementation which allows CSV exportation is available on BasicMonitor.java. You may also take a look at your IDE profiler, JRockit Mission Control or JAMon / Java Simon
  • For multithreading support, I have used another small utility class: Executor.java.
  • You may also take a look at JMH.

Upvotes: 0

Related Questions