Reputation: 977
I have an application that reads in objects from multiple serialized object logs and hands them off to another class for processing. My question focuses on how to efficiently and cleanly read in the objects and send them off.
The code was pulled from an older version of the application, but we ended up keeping it as is. It hasn't really been used much until the past week, but I recently started looking at the code more closely to try and improve it.
It opens N ObjectInputStream
s, and reads one object from each stream to store them in an array (assume inputStreams
below is just an array of ObjectInputStream
objects that corresponds to each log file):
for (int i = 0; i < logObjects.length; i++) {
if (inputStreams[i] == null) {
continue;
}
try {
if (logObjects[i] == null) {
logObjects[i] = (LogObject) inputStreams[i].readObject();
}
} catch (final InvalidClassException e) {
LOGGER.warn("Invalid object read from " + logFileList.get(i).getAbsolutePath(), e);
} catch (final EOFException e) {
inputStreams[i] = null;
}
}
The objects that were serialized to file are LogObject
objects. Here is the LogObject
class:
public class LogObject implements Serializable {
private static final long serialVersionUID = -5686286252863178498L;
private Object logObject;
private long logTime;
public LogObject(Object logObject) {
this.logObject = logObject;
this.logTime = System.currentTimeMillis();
}
public Object getLogObject() {
return logObject;
}
public long getLogTime() {
return logTime;
}
}
Once the objects are in the array, it then compares the log time and sends off the object with the earliest time:
// handle the LogObject with the earliest log time
minTime = Long.MAX_VALUE;
for (int i = 0; i < logObjects.length; i++) {
logObject = logObjects[i];
if (logObject == null) {
continue;
}
if (logObject.getLogTime() < minTime) {
index = i;
minTime = logObject.getLogTime();
}
}
handler.handleOutput(logObjects[index].getLogObject());
My first thought was to create a thread for each file that reads in and puts the objects in a PriorityBlockingQueue
(using a custom comparator that uses the LogObject
log time to compare). Another thread could then be taking the values out and sending them off.
The only issue here is that one thread could put an object on the queue and have it taken off before another thread could put one on that may have an earlier time. This is why the objects were read in and stored in an array initially before checking for the log time.
Does this constraint prohibit me from implementing a multi-threaded design? Or is there a way I can tweak my solution to make it more efficient?
Upvotes: 1
Views: 86
Reputation: 9955
As far as I understand your problem you need to process LogObjects
strictly in order. In that case initial part of your code is totally correct. What this code does is merge sort of several input streams. You need to read one object for each stream (this is why temporary array is needed) then take appropriate (minimum/maximum) LogObject
and handle to processor.
Depending on your context you might be able to do processing in several threads. The only thing you need to change is to put LogObjects
in ArrayBlockingQueue
and processors might runs on several independent threads. Another option is to send LogObjects
for processing in ThreadPoolExecutor
. Last option is more simple and straightforward.
But be aware of several pitfalls on the way:
So now you should face several questions:
Answer to those question will have great impact on your ability to do parallel processing.
If the answer on first question is yes, sadly, parallel processing is not an option.
Upvotes: 1
Reputation: 310980
I agree with you. Throw this away and use a PriorityBlockingQueue.
The only issue here is that if Thread 1 has read an object from File 1 in and put it in the queue (and the object File 2 was going to read in has an earlier log time), the reading Thread could take it and send it off, resulting in a log object with a later time being sent first
This is exactly like the merge phase of a balanced merge (Knuth ACP vol 3). You must read the next input from the same file as you got the previous lowest element from.
Does this constraint prohibit me from implementing a multi-threaded design?
It isn't a constraint. It's imaginary.
Or is there a way I can tweak my solution to make it more efficient?
Priority queues are already pretty efficient. In any case you should certainly worry about correctness first. Then add buffering ;-) Wrap the ObjectInputStreams
around BufferedInputStreams
, and ensure there is a BufferedOutputStream
in your output stack.
Upvotes: 0