Reputation: 1198
I am trying to implement a async return for a time consuming task. So task data is inserted in a Q, and return immediately. A bunch of 20 threads take tasks from that data queue. But the data in thread is getting null, I wonder why.
Code segment's (only relevant parts of code)
Definition
private static LinkedList<Object[]> cacheQ = new LinkedList<Object[]>();
private static ArrayList<Thread> cacheThreads = null;
One time initialization
if (cacheThreads == null) {
cacheThreads = new ArrayList<Thread>();
for (i = 0; i < 20; i++) {
CacheThread cacheThread = readWriteOps.new CacheThread();
Thread thread = new Thread(cacheThread);
cacheThreads.add(i, thread);
thread.start();
}
System.out.println("CAche Threads Started!");
}
Adding a new task to the Q at the beginning (addFirst
)
public void arrayizeToCache(String key, CacheData value) {
synchronized (cacheThreads) {
Object[] tuple = new Object[SIZE] ;
tuple[KEY] = key ;
tuple[VALUE] = value ;
----NOT NULL----log.debug("CacheThread arrayizeToCache k"+key+"v"+value) ;
cacheQ.addFirst((Object[])tuple);
cacheThreads.notify();
}
}
The actual work of the thread
public class CacheThread implements Runnable {
@Override
public void run() {
System.out.println("CachedThread running!");
CacheData cacheStore = null;
while (true) {
try {
String key;
synchronized (cacheThreads) {
if (cacheQ.isEmpty()) {
log.debug("Cache Q waiting");
cacheThreads.wait();
}
if (cacheQ.isEmpty()) {
log.error("Cache List empty nothing to cache");
continue;
}
Object[] cacheData = (Object[]) cacheQ.removeLast();
key = (String) cacheData[KEY] ;
cacheStore = (CacheData) cacheData[VALUE];
----- HERE -----
//More code, but irrelevant for this question.
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// log.debug(((Runnable) this).toString() +
// "Server : flush SUCCESS");
}
}
}
At -----HERE-----
I am getting a null for cacheStore.getValue()
. What am I missing here. Something basic.
I have used a LinkedList, for the Q, so structurally it should work. The linkedlist holds the data which I need to process. It holds a Object array since I have more than 1 data element.
Edit :
private static final int KEY = 0 ;
private static final int VALUE = 1 ;
private static final int SIZE = 2 ;
Edit Again : This is how I am calling arrayize
ReadWriteOperations.getInstance(this).arrayizeToCache(key,
new CacheData(subscription, SuperCache.NEVER_EXPIRE));
My CacheData object
public class CacheData {
private Object value ;
private Integer expiry = 0 ;
public CacheData(Object newValue) {
value = newValue ;
}
public CacheData (Object newValue, Integer newExpiry) {
// THIS LINE WAS MISSING value = newValue ;
expiry = newExpiry ;
}
public Object getValue() {
return value ;
}
public Integer getExpiry () {
return expiry ;
}
}
Answer : I was initializing value at all. Kinda assumed that it will be a more complex thing. And its 2am here :) Thanks to @Gray.
Upvotes: 0
Views: 405
Reputation: 116878
Your code looks fine to me. I would be interested to know:
synchronized (cacheThreads) {
block for the changes to be visible to the running threads. Here are some additional comments about the code:
You are forking your own threads and then having a synchronized
LinkedList
for the jobs. I would encourage you to look into the ExecutorService
patterns that do both of those for. Definitely recommended for most threading tasks.
If you don't move to an ExecutorService
, you should switch your cacheQ
queue to be a BlockingQueue
which takes care of the synchronized
, notify()
/wait()
, etc. for you.
Upvotes: 1