Reputation: 1611
I have a requirement to have a queue cache of messages in multi threaded environment where thousands of requests are coming per seconds.
Each request thread should pop out the message from queue (AND NO THREAD SHOULD GET DUPLICATE MESSAGE) if queue is not empty + decrements a counter in db .And if the queue is empty the thread will fetch a fixed number of message (for example 100) from DB and fill the queue cache (other thread should wait while the queue is filling) and then pop-out one message + decrements a counter in db and return.
The poping out and decrementing counter in db should be in synch to avoid any inconsistency.
So from the requirement it is clear that cache will have more read and remove (pop-out) operation however less write operation (only when the cache is empty).
Right now i have a synchronized method getMessage which has a ArrayList in it and I do the above operation (pop-out if empty else fetch and then pop-out) in in this method but i have obviously facing a lot of contention issue.
If at read/remove time, concurrent threads get different locks and at the time of write, the lock should be on whole cache then this will reduce my contentions issue.
Which java cache should be best in this case? In load i am facing low performance due to this. Kindly give me some better idea.
Upvotes: 0
Views: 4756
Reputation: 441
Instead of Synchronize
you can look into ReentrantLocks
. For your requirement there is an ReentrantReadWriteLock
Class in Java. You can refer the Java docs for details. Basically an ReentrantReadWriteLock
is recommended in cases where data is accessed by more reader threads than writer threads.
As per this implementation, multiple threads can read the same resource without locking. But a single write operation to the resource locks it and no other reads nor writes are allowed at the same time.
There are many sample examples available online which you can refer to implement ReentrantReadWriteLock
.
Sample:
package concurrency.reentrantreadwrite;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReentrantReadWrite {
public static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
public static StringBuffer message = new StringBuffer("a");
public static void main(String[] args) throws InterruptedException{
Thread t1 = new Thread(new Reader(lock, message));
Thread t2 = new Thread(new WriterA(lock, message));
Thread t3 = new Thread(new WriterB(lock, message));
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
}
}
package concurrency.reentrantreadwrite;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Reader implements Runnable
{
ReentrantReadWriteLock lock = null;
StringBuffer message = null;
public Reader(ReentrantReadWriteLock lock, StringBuffer message) {
this.lock = lock;
this.message = message;
}
public void run()
{
for(int i = 0; i<= 10; i ++)
{
if(lock.isWriteLocked()) {
System.out.println("I'll take the lock from Write");
}
lock.readLock().lock();
System.out.println("ReadThread " + Thread.currentThread().getId() + " ---> Message is " + message.toString() );
lock.readLock().unlock();
}
}
}
package concurrency.reentrantreadwrite;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class WriterA implements Runnable
{
ReentrantReadWriteLock lock = null;
StringBuffer message = null;
public WriterA(ReentrantReadWriteLock lock, StringBuffer message) {
this.lock = lock;
this.message = message;
}
public void run()
{
for(int i = 0; i<= 10; i ++)
{
try {
lock.writeLock().lock();
message.append("a");
}finally {
lock.writeLock().unlock();
}
}
}
}
package concurrency.reentrantreadwrite;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class WriterB implements Runnable
{
ReentrantReadWriteLock lock = null;
StringBuffer message = null;
public WriterB(ReentrantReadWriteLock lock, StringBuffer message) {
this.lock = lock;
this.message = message;
}
public void run()
{
for(int i = 0; i<= 10; i ++)
{
try {
lock.writeLock().lock();
message.append("b");
}finally {
lock.writeLock().unlock();
}
}
}
}
Upvotes: 4
Reputation: 1623
When you're facing an architecture problem, you should extract a "model" and abstract any external part/systems with interfaces. Thus, you can easily start by making those interfaces waiting a fixed time, then randomly, next with sampled durations and so forth.
It makes you clearer how your algorithm/system works and make very easy to refactor/adapt until the "model" is satisfactory. As your model become clearer, it also make you easier to insert monitoring and show what/where you have to monitor. I can add more complete explanation if necessary.
From your problem, you should add better separation between "cache consuming" and "cache producing". You also have to use non-blocking algorithm and smoother push to the "cache". Note that "non-blocking" doesn't mean that yours threads will never starves ; if there's nothing to process, they will necessary be sleepy. However, they shouldn't block each other.
As mentionned on the summary, external systems must be abstracted and default value/spent time must be used. Thus you can use the same scenarii/model but changing the way of stubbing.
Storage abstraction:
interface Storage<T> {
/**
* Fetch next messages.
* @param target Target collection to collect data.
* @param fetchSize Maximum record to fetch.
* @param Actually fetch messsages.
*/
int drainTo(Collection<T> target, int fetchSize);
/**
* Update message.
*/
void update(T message);
}
For message processing we will abstract it to Java 8 SE Consumer
.
Here is an implementation from what I have understood from your question and related comments:
class Model<T> {
private Storage<T> storage;
private int fetchSize;
private Consumer<T> processor;
/**
* while active:
* Try to poll a message from cache
* if none:
* Fetch from storage
* Put fetched messages to cache
* else:
* Process message
* Update storage according to processed message
**/
public void consume(BooleanSupplier active) {
try {
while (active.getAsBoolean()) {
T message = cache.poll();
if (message == null) {
// Fetch new messages from storage
List<T> newMessages = new ArrayList<>(fetchSize);
synchronized (storage) {
storage.drainTo(newMessages, fetchSize);
}
// Push new messages to cache
for (T newMessage : newMessages) {
cache.put(newMessage);
}
} else {
processor.accept(message);
storage.update(message);
}
}
} catch (InterruptedException e) {
if (!active.getAsBoolean()) {
throw new RuntimeException(e);
}
}
}
}
cache.put(T)
is a blocking operation and all other consumer threads might been waiting for storage access a deadlock will occur.I have written a small system to take measure. I have used the following parameters:
storage.drainTo
is blocking for 33ms
storage.update
is blocking for 5msMy observations:
storage.drainTo
3,363ms / (4 x 10s)
storage.update
99.4% of execution time is spent on storage management. There's no issue with Java queues.
Based on my analysis, I suggest this version:
class ImprovedModel<T> {
private Storage<T> storage;
private int fetchSize;
private Consumer<T> processor;
private StampedLock lock = new StampedLock();
/**
* while active:
* Try to poll a message from cache
* if none:
* if storage available:
* Transfer from storage to cache
* else:
* Take message from cache
* if message:
* Process message
* Update storage according to processed message
**/
public void consume(BooleanSupplier active) {
try {
while (active.getAsBoolean()) {
T message = cache.poll();
if (message == null) {
// Check storage availability
int stamp = lock.tryWriteLock();
try {
if (stamp != 0) {
storage.drainTo(cache, fetchSize);
} else {
message = cache.take();
}
} finally {
lock.unlockWrite(stamp);
}
}
if (message != null) {
processor.accept(message);
storage.update(message);
}
}
} catch (InterruptedException e) {
if (!active.getAsBoolean()) {
throw new RuntimeException(e);
}
}
}
}
Notes:
StampedLock
. You may try other implementations but due to low cache miss it might not have meaningful impact.offer
method in order to terminate on failure (cache full). Thus, "cache producer" can quickly returns to "cache consumer" state to help others to consume messages.New observations:
storage.drainTo
cache.take
storage.update
You may deduce cache.take
starvation by comparing new cache.poll
+cache.take
and old cache.poll
throughput.
Upvotes: 0