veer
veer

Reputation: 23

Multiple Threads in a thread pool writing data in same List

I'm having multiple threads running in my threadPool Each thread reads a huge file and returns the data from this file in a List.

Code looks like :

class Writer{


   ArrayList finalListWhereDataWillBeWritten = new Array<Integer>()
   for(query q : allQueries){ //all the read queries to read file

      threadPool.submit(new GetDataFromFile(fileName,filePath));        

   }//all the read queries have been submitted.

}

Now I know that following section of code will occur some where in my code but I don't know where to place it. Because if I place it just after submit() in for loop it'll not add it because each file is very huge and may not have completed its processing.

synchronized(finalListWhereDataWillBeWritten){

  //process the data obtained from single file and add it to target list 
      finalListWhereDataWillBeWritten.addAll(dataFromSingleThread); 

}

So can anyone please tell me that where do I place this chunk of code and what other things I need to make sure of so that Critical Section Problem donot occur.

class GetDataFromFile implements Runnable<List<Integer>>{

   private String fileName;
   private String filePath;

   public List<Integer> run(){
       //code for streaming the file fileName 
       return dataObtainedFromThisFile;
   }

}

And do i need to use wait() / notifyAll() methods in my code given that I'm only reading data from files parallely in threads and placing them in a shared List

Upvotes: 2

Views: 3597

Answers (2)

Constantin
Constantin

Reputation: 1506

UPDATE Please consider the answer provided by Marko which is far better

If you want to ensure that your threads all complete before you work on your list, do the following:

import java.util.List;
import java.util.Vector;

public class ThreadWork {

  public static void main(String[] args) {

    int count = 5;
    Thread[] threads = new ListThread[count];
    List<String> masterList = new Vector<String>();

    for(int index = 0; index < count; index++) {
      threads[index] = new ListThread(masterList, "Thread " + (index + 1));
      threads[index].start();
    }
    while(isOperationRunning(threads)) {
      // do nothing
    }

    System.out.println("Done!! Print Your List ...");

    for(String item : masterList){
      System.out.println("[" + item + "]");
    }
  }

  private static boolean isOperationRunning(Thread[] threads) {
    boolean running = false;

    for(Thread thread : threads) {
      if(thread.isAlive()) {
        running = true;
        break;
      }
    }
    return running;
  }
}

class ListThread extends Thread {
  private static String items[] = { "A", "B", "C", "D"};
  private List<String> list;
  private String name;

  public ListThread(List<String> masterList, String threadName) {
    list = masterList;
    name = threadName;
  }

  public void run() {
    for(int i = 0; i < items.length;++i) {
      randomWait();
      String data = "Thread [" + name + "][" + items[i] + "]";
      System.out.println( data );
      list.add( data );
    }
  }

  private void randomWait() {
    try {
      Thread.currentThread();
      Thread.sleep((long)(3000 * Math.random()));
    }
    catch (InterruptedException x) {}
  }
}

Upvotes: 0

Marko Topolnik
Marko Topolnik

Reputation: 200158

Instead of reinventing the wheel you should simply implement Callable<List<Integer>> and submit it to the JDK's standard Executor Service. Then, as the futures complete, you collect the results into the list.

final ExecutorService threadPool = 
    Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
final List<Future<List<Integer>>> futures = new ArrayList<>();
for(query q : allQueries) {
  futures.add(threadPool.submit(new GetDataFromFile(fileName, filePath)));
}
for (Future<List<Integer>> f : futures) {
    finalListWhereDataWillBeWritten.addAll(f.get());
}

And this is all assuming you are below Java 8. With Java 8 you would of course use a parallel stream:

final List<Integer> finalListWhereDataWillBeWritten =
  allQueries.parallelStream()
            .flatMap(q -> getDataFromFile(q.fileName, q.filePath))
            .collect(toList());

Upvotes: 3

Related Questions