James Raitsev
James Raitsev

Reputation: 96531

Threaded BlockingQueue, clarification needed

When working with a BlockingQueue, i implemented the following logic to read from it until told otherwise. Unfortunately the following is happening, intermittently:

The problem:

As part of the QThread class, i declare:

public static volatile boolean          shouldContinueReading   = true;

Run (confirmed to be executing) method contains:

    while (shouldContinueReading) {
        try {

            String retrieved = qIn.poll(2, TimeUnit.MILLISECONDS);
            if (retrieved != null)
                consume(retrieved);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    System.out.println("I am out");  // <-- not always seen
    if (qIn.remainingCapacity() > 0) {
        try {
            consume(qIn.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

While this is going on, in another thread, when certain things happen, shouldContinueReading changes state

    while (stillReading) {
        // do nothing
    }

    QThread.shouldContinueReading = false;

Update: problem resolved

Turns out the problem lies a bit further:

private void consume(String take) {
            // some processing

    produce(newData.toString());
}

private void produce(String newData) {
    System.out.println(newData);
    try {
        qOut.put(newData);          // <-- Problem is here. Should use offer instead of put
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Both qIn (queue in) and qOut (queue out) are declared as:

private volatile BlockingQueue<String>  qIn;
private volatile BlockingQueue<String>  qOut;

The objects themselves are created elsewhere as follows and passed down to the constructor:

    BlockingQueue<String> q1 = new SynchronousQueue<String>();
    BlockingQueue<String> q2 = new SynchronousQueue<String>();

    QThread qThread = new QThread(q1, q2);

Any suggestions? what i should do with qOut? Am i not declaring it correctly?

Upvotes: 0

Views: 185

Answers (1)

Enno Shioji
Enno Shioji

Reputation: 26882

I bet QThread.shouldContinueReading = false; isn't getting executed always,or the reading thread is not executing in the first place. I.e. the problem you are seeing is likely somewhere up the stream -- not here. The first thing I'd do would be to pin down where exactly the problem lies, with 100% confidence (put some more print statements).

Apart from the problem, I'd recommend to use the thread interruption mechanism instead of rolling your own flag (which is, in turn just a glorified flag, but that way you can affect third party codes like BlockedQueue and make the implementation simpler and more efficient even) especially if this is production code.

Upvotes: 1

Related Questions