taxeeta
taxeeta

Reputation: 1198

Java threading issue, I am getting a null, while executing a async task

I am trying to implement a async return for a time consuming task. So task data is inserted in a Q, and return immediately. A bunch of 20 threads take tasks from that data queue. But the data in thread is getting null, I wonder why.

Code segment's (only relevant parts of code)

Definition

private static LinkedList<Object[]> cacheQ = new LinkedList<Object[]>();
private static ArrayList<Thread> cacheThreads = null;

One time initialization

if (cacheThreads == null) {
        cacheThreads = new ArrayList<Thread>();
        for (i = 0; i < 20; i++) {
            CacheThread cacheThread = readWriteOps.new CacheThread();
            Thread thread = new Thread(cacheThread);
            cacheThreads.add(i, thread);
            thread.start();
        }
        System.out.println("CAche Threads Started!");
    }

Adding a new task to the Q at the beginning (addFirst)

public void arrayizeToCache(String key, CacheData value) {
    synchronized (cacheThreads) {
        Object[] tuple = new Object[SIZE] ;
        tuple[KEY] = key ;
        tuple[VALUE] = value ;
----NOT NULL----log.debug("CacheThread arrayizeToCache k"+key+"v"+value) ;
        cacheQ.addFirst((Object[])tuple);
        cacheThreads.notify();
    }
}

The actual work of the thread

public class CacheThread implements Runnable {
    @Override
    public void run() {
        System.out.println("CachedThread running!");
        CacheData cacheStore = null;
        while (true) {
            try {
                String key;
                synchronized (cacheThreads) {
                    if (cacheQ.isEmpty()) {
                        log.debug("Cache Q waiting");
                        cacheThreads.wait();
                    }
                    if (cacheQ.isEmpty()) {
                        log.error("Cache List empty nothing to cache");
                        continue;
                    }
                    Object[] cacheData = (Object[]) cacheQ.removeLast();
                    key = (String) cacheData[KEY] ;
                    cacheStore = (CacheData) cacheData[VALUE];
----- HERE -----
//More code, but irrelevant for this question. 
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // log.debug(((Runnable) this).toString() +
            // "Server : flush SUCCESS");
        }
    }
}

At -----HERE----- I am getting a null for cacheStore.getValue(). What am I missing here. Something basic.

I have used a LinkedList, for the Q, so structurally it should work. The linkedlist holds the data which I need to process. It holds a Object array since I have more than 1 data element.

Edit :

private static final int KEY = 0 ;
private static final int VALUE = 1 ;
private static final int SIZE = 2 ;

Edit Again : This is how I am calling arrayize

ReadWriteOperations.getInstance(this).arrayizeToCache(key,
            new CacheData(subscription, SuperCache.NEVER_EXPIRE));

My CacheData object

public class CacheData {
private Object value ;
private Integer expiry = 0 ;
public CacheData(Object newValue) {
    value = newValue ;
}
public CacheData (Object newValue, Integer newExpiry) {
// THIS LINE WAS MISSING        value = newValue ;
    expiry = newExpiry ;
}
public Object getValue() {
    return value ;
}
public Integer getExpiry () {
    return expiry ;
}
}

Answer : I was initializing value at all. Kinda assumed that it will be a more complex thing. And its 2am here :) Thanks to @Gray.

Upvotes: 0

Views: 405

Answers (1)

Gray
Gray

Reputation: 116878

Your code looks fine to me. I would be interested to know:

  • Where is cacheStore's value updated?
  • Is it updated after the threads are started? This may be the problem.
  • If so, it needs to be updated inside of a synchronized (cacheThreads) { block for the changes to be visible to the running threads.

Here are some additional comments about the code:

  • You are forking your own threads and then having a synchronized LinkedList for the jobs. I would encourage you to look into the ExecutorService patterns that do both of those for. Definitely recommended for most threading tasks.

  • If you don't move to an ExecutorService, you should switch your cacheQ queue to be a BlockingQueue which takes care of the synchronized, notify()/wait(), etc. for you.

Upvotes: 1

Related Questions