Evan LaHurd
Evan LaHurd

Reputation: 977

Using a PriorityBlockingQueue to feed in logged objects for processing

I have an application that reads in objects from multiple serialized object logs and hands them off to another class for processing. My question focuses on how to efficiently and cleanly read in the objects and send them off.

The code was pulled from an older version of the application, but we ended up keeping it as is. It hasn't really been used much until the past week, but I recently started looking at the code more closely to try and improve it.

It opens N ObjectInputStreams, and reads one object from each stream to store them in an array (assume inputStreams below is just an array of ObjectInputStream objects that corresponds to each log file):

for (int i = 0; i < logObjects.length; i++) {
    if (inputStreams[i] == null) {
        continue;
    }
    try {
        if (logObjects[i] == null) {
            logObjects[i] = (LogObject) inputStreams[i].readObject();
        }
    } catch (final InvalidClassException e) {
        LOGGER.warn("Invalid object read from " + logFileList.get(i).getAbsolutePath(), e);
    } catch (final EOFException e) {
        inputStreams[i] = null;
    }
}

The objects that were serialized to file are LogObject objects. Here is the LogObject class:

public class LogObject implements Serializable {

    private static final long serialVersionUID = -5686286252863178498L;

    private Object logObject;
    private long logTime;

    public LogObject(Object logObject) {
        this.logObject = logObject;
        this.logTime = System.currentTimeMillis();
    }

    public Object getLogObject() {
        return logObject;
    }

    public long getLogTime() {
        return logTime;
    }
}

Once the objects are in the array, it then compares the log time and sends off the object with the earliest time:

// handle the LogObject with the earliest log time
minTime = Long.MAX_VALUE;
for (int i = 0; i < logObjects.length; i++) {
    logObject = logObjects[i];
    if (logObject == null) {
        continue;
    }
    if (logObject.getLogTime() < minTime) {
        index = i;
        minTime = logObject.getLogTime();
    }
}

handler.handleOutput(logObjects[index].getLogObject());

My first thought was to create a thread for each file that reads in and puts the objects in a PriorityBlockingQueue (using a custom comparator that uses the LogObject log time to compare). Another thread could then be taking the values out and sending them off.

The only issue here is that one thread could put an object on the queue and have it taken off before another thread could put one on that may have an earlier time. This is why the objects were read in and stored in an array initially before checking for the log time.

Does this constraint prohibit me from implementing a multi-threaded design? Or is there a way I can tweak my solution to make it more efficient?

Upvotes: 1

Views: 86

Answers (2)

Denis Bazhenov
Denis Bazhenov

Reputation: 9955

As far as I understand your problem you need to process LogObjects strictly in order. In that case initial part of your code is totally correct. What this code does is merge sort of several input streams. You need to read one object for each stream (this is why temporary array is needed) then take appropriate (minimum/maximum) LogObject and handle to processor.

Depending on your context you might be able to do processing in several threads. The only thing you need to change is to put LogObjects in ArrayBlockingQueue and processors might runs on several independent threads. Another option is to send LogObjects for processing in ThreadPoolExecutor. Last option is more simple and straightforward.

But be aware of several pitfalls on the way:

  • for this algorithm to work correctly individual streams must be already sorted. Otherwise your program is broken;
  • when you do processing in parallel message processing order is strictly speaking is not defined. So proposed algorithms only guarantees message processing start order (dispatch order). It might be not what you want.

So now you should face several questions:

  1. Do processing order is really required?
  2. If so, does global order required (over all messages) or local one (over independent group of messages)?

Answer to those question will have great impact on your ability to do parallel processing.

If the answer on first question is yes, sadly, parallel processing is not an option.

Upvotes: 1

user207421
user207421

Reputation: 310980

I agree with you. Throw this away and use a PriorityBlockingQueue.

The only issue here is that if Thread 1 has read an object from File 1 in and put it in the queue (and the object File 2 was going to read in has an earlier log time), the reading Thread could take it and send it off, resulting in a log object with a later time being sent first

This is exactly like the merge phase of a balanced merge (Knuth ACP vol 3). You must read the next input from the same file as you got the previous lowest element from.

Does this constraint prohibit me from implementing a multi-threaded design?

It isn't a constraint. It's imaginary.

Or is there a way I can tweak my solution to make it more efficient?

Priority queues are already pretty efficient. In any case you should certainly worry about correctness first. Then add buffering ;-) Wrap the ObjectInputStreams around BufferedInputStreams, and ensure there is a BufferedOutputStream in your output stack.

Upvotes: 0

Related Questions