Reputation: 23
I'm having multiple threads running in my threadPool
Each thread reads a huge file and returns the data from this file in a List.
Code looks like :
class Writer{
ArrayList finalListWhereDataWillBeWritten = new Array<Integer>()
for(query q : allQueries){ //all the read queries to read file
threadPool.submit(new GetDataFromFile(fileName,filePath));
}//all the read queries have been submitted.
}
Now I know that following section of code will occur some where in my code but I don't know where to place it.
Because if I place it just after submit()
in for loop it'll not add it because each file is very huge and may not have completed its processing.
synchronized(finalListWhereDataWillBeWritten){
//process the data obtained from single file and add it to target list
finalListWhereDataWillBeWritten.addAll(dataFromSingleThread);
}
So can anyone please tell me that where do I place this chunk of code and what other things I need to make sure of so that Critical Section Problem donot occur.
class GetDataFromFile implements Runnable<List<Integer>>{
private String fileName;
private String filePath;
public List<Integer> run(){
//code for streaming the file fileName
return dataObtainedFromThisFile;
}
}
And do i need to use wait()
/ notifyAll()
methods in my code given that I'm only reading data from files parallely in threads and placing them in a shared List
Upvotes: 2
Views: 3597
Reputation: 1506
UPDATE Please consider the answer provided by Marko which is far better
If you want to ensure that your threads all complete before you work on your list, do the following:
import java.util.List;
import java.util.Vector;
public class ThreadWork {
public static void main(String[] args) {
int count = 5;
Thread[] threads = new ListThread[count];
List<String> masterList = new Vector<String>();
for(int index = 0; index < count; index++) {
threads[index] = new ListThread(masterList, "Thread " + (index + 1));
threads[index].start();
}
while(isOperationRunning(threads)) {
// do nothing
}
System.out.println("Done!! Print Your List ...");
for(String item : masterList){
System.out.println("[" + item + "]");
}
}
private static boolean isOperationRunning(Thread[] threads) {
boolean running = false;
for(Thread thread : threads) {
if(thread.isAlive()) {
running = true;
break;
}
}
return running;
}
}
class ListThread extends Thread {
private static String items[] = { "A", "B", "C", "D"};
private List<String> list;
private String name;
public ListThread(List<String> masterList, String threadName) {
list = masterList;
name = threadName;
}
public void run() {
for(int i = 0; i < items.length;++i) {
randomWait();
String data = "Thread [" + name + "][" + items[i] + "]";
System.out.println( data );
list.add( data );
}
}
private void randomWait() {
try {
Thread.currentThread();
Thread.sleep((long)(3000 * Math.random()));
}
catch (InterruptedException x) {}
}
}
Upvotes: 0
Reputation: 200158
Instead of reinventing the wheel you should simply implement Callable<List<Integer>>
and submit it to the JDK's standard Executor Service. Then, as the futures complete, you collect the results into the list.
final ExecutorService threadPool =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
final List<Future<List<Integer>>> futures = new ArrayList<>();
for(query q : allQueries) {
futures.add(threadPool.submit(new GetDataFromFile(fileName, filePath)));
}
for (Future<List<Integer>> f : futures) {
finalListWhereDataWillBeWritten.addAll(f.get());
}
And this is all assuming you are below Java 8. With Java 8 you would of course use a parallel stream:
final List<Integer> finalListWhereDataWillBeWritten =
allQueries.parallelStream()
.flatMap(q -> getDataFromFile(q.fileName, q.filePath))
.collect(toList());
Upvotes: 3