Reputation: 2197
Forgive me if the title is a bit vague. I will try to explain a bit better what i am trying to accomplish.
There is a function called parsebytes that is part of an external interface that i have implemented. It takes an array of bytes and a length. All parsing in this particular program runs on a single thread so i want to get my data out of parsebytes as quickly as possible so it can return to getting more data off the line. My methodology in pseudocode is this: Create an externally running thread (ParserThreadClass). every time parsebytes is called, put the bytes into a queue in the ParserThreadClass by looping through all the bytes and doing a byteQueue.add(bytes[i]). This code is surrounded by a synchronized(byteQueue) That, in effect, should free the parsebytes to go back and get more data.
While that is happening, my ParserThreadClass is also running. This is the code in the run() function
while (!shutdown) //while the thread is still running
{
synchronized (byteQueue)
{
bytes.addAll(byteQueue); //an arraylist
byteQueue.clear();
}
parseMessage(); //this will take the bytes arraylist and build an xml message.
}
Am i being overly inefficient here? If so, can someone give me an idea of how i should tackle this?
Upvotes: 0
Views: 183
Reputation: 1855
This is how I've tried to solve the problem previously. Basically you have a producer thread, like you have here, that reads the file and puts items onto the queue. Then you have a worker thread that reads things from the queue and processes them. Code is below, but it looks essentially the same as what you're doing. What I found is that this gives me just about no speed up, because the processing I need to do per line is pretty quick, relative to the disk read. If the parsing you have to do is pretty intensive, or the chunks are pretty large, you could see some speed up doing it this way. But if it's pretty minimal, don't expect to see much in the way of performance improvement, because the process is IO bound. In those situations, you need to parallelize the disk access, which you can't really do on a single machine.
public static LinkedBlockingQueue<Pair<String, String>> mappings;
public static final Pair<String, String> end =
new Pair<String, String>("END", "END");
public static AtomicBoolean done;
public static NpToEntityMapping mapping;
public static Set<String> attested_nps;
public static Set<Entity> possible_entities;
public static class ProducerThread implements Runnable {
private File f;
public ProducerThread(File f) {
this.f = f;
}
public void run() {
try {
BufferedReader reader = new BufferedReader(new FileReader(f));
String line;
while ((line = reader.readLine()) != null) {
String entities = reader.readLine();
String np = line.trim();
mappings.put(new Pair<String, String>(np, entities));
}
reader.close();
for (int i=0; i<num_threads; i++) {
mappings.put(end);
}
} catch (InterruptedException e) {
System.out.println("Producer thread interrupted");
} catch (IOException e) {
System.out.println("Producer thread threw IOException");
}
}
}
public static class WorkerThread implements Runnable {
private Dictionary dict;
private EntityFactory factory;
public WorkerThread(Dictionary dict, EntityFactory factory) {
this.dict = dict;
this.factory = factory;
}
public void run() {
try {
while (!done.get()) {
Pair<String, String> np_ent = mappings.take();
if (np_ent == end) {
done.set(false);
continue;
}
String entities = np_ent.getRight();
String np = np_ent.getLeft().toLowerCase();
if (attested_nps == null || attested_nps.contains(np)) {
int np_index = dict.getIndex(np);
HashSet<Entity> entity_set = new HashSet<Entity>();
for (String entity : entities.split(", ")) {
Entity e = factory.createEntity(entity.trim());
if (possible_entities != null) {
possible_entities.add(e);
}
entity_set.add(e);
}
mapping.put(np_index, entity_set);
}
}
} catch (InterruptedException e) {
System.out.println("Worker thread interrupted");
}
}
}
EDIT:
Here's code for the main thread that starts the producer and worker threads:
Thread producer = new Thread(new ProducerThread(f), "Producer");
producer.start();
ArrayList<Thread> workers = new ArrayList<Thread>();
for (int i=0; i<num_threads; i++) {
workers.add(new Thread(new WorkerThread(dict, factory), "Worker"));
}
for (Thread t : workers) {
t.start();
}
try {
producer.join();
for (Thread t : workers) {
t.join();
}
} catch (InterruptedException e) {
System.out.println("Main thread interrupted...");
}
It should also be fine to have the work done in the producer thread just be done in the main thread, taking out the need to start and join with another thread in the main code. Be sure to start the worker threads before going through the file, though, and join with them after you've done the work. I'm not sure about the performance differences between that way and the way I have here, though.
Upvotes: 2