Reputation: 826
I am new to multithreading in Java .I have implemented a multithreading program in java to process an array and need your help and suggestions to optimise it and refactor it if possible.
Scenario
We get a huge csv file, which has over 1000s of rows and we need to process it.
So i basically convert them to array, split them and pass to execution program and input will be subset of the arrays.
Right now i am splitting the array to 20 equal subset and passing to 20 threads for execution. It is taking ~2 mins which is fine . Without multithreading it takes 30 mins.
Help needed
I am giving the snapshot of my code below. Although it works fine, i am wondering whether there is any way to standardize it more and refactor it. Rightnow it looks clumsy. TO be more specific, instead of creating individual thread runners if i can parameterize it, then it will be great.
Code
private static void ProcessRecords(List<String[]> inputCSVData)
{
// Do some operation
}
**In the main program**
public static void main(String[] args)throws ClassNotFoundException, SQLException, IOException, InterruptedException
{
int size = csvData.size();
// Split the array
int firstArraySize = (size / 20);
int secondArrayEndIndex = (firstArraySize * 2) - 1;
csvData1 = csvData.subList(1, firstArraySize);
csvData2 = csvData.subList(firstArraySize, secondArrayEndIndex);
// .... and so on
Thread thread1 = new Thread(new Runnable() {
public void run() {
try {
ProcessRecords(csvData1);
} catch (ClassNotFoundException | SQLException | IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
Thread thread2 = new Thread(new Runnable() {
public void run()
{
try {
ProcessRecords(csvData2);
} catch (ClassNotFoundException | SQLException | IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
**and so on for 20 times**
thread1.start();
thread2.start();
//... For all remaining threads
// thread20.start();
thread1.join();
thread2.join();
//... For all remaining threads
// thread20.join();
}
Upvotes: 1
Views: 239
Reputation: 44965
Since Java 7, you can implement such mechanism efficiently out of the box thanks to the Fork/Join Framework. Starting from Java 8
, you can do it directly with the Stream API
more precisely with a parallel stream which uses behind the scene a ForkJoinPool
in order to leverage its work-stealing algorithm to provide the best possible performances.
In your case, you could process it line by line as next:
csvData.parallelStream().forEach(MyClass::ProcessRecord);
With a method ProcessRecord
of the class MyClass
of type:
private static void ProcessRecord(String[] inputCSVData){
// Do some operation
}
By default a parallel stream will use the common ForkJoinPool
with a size corresponding to Runtime.getRuntime().availableProcessors()
which is enough for tasks with very few IO, if you have tasks with IO such that you would like to increase the size of the pool, simply provide the initial task to your custom ForkJoinPool
, the parallel stream will then use your pool instead of the common pool.
ForkJoinPool forkJoinPool = new ForkJoinPool(20);
forkJoinPool.submit(() -> csvData.parallelStream().forEach(MyClass::ProcessRecord)).get();
Upvotes: 4
Reputation: 3570
You have done a lot of redundant work to come here. You can use an ExecutorService
with a FixedThreadPool
and submit tasks to the thread pool, instead of hard coding 20 threads.
Also, how was the value of 20 for the number of threads decided? Use,
Runtime.getRuntime().availableProcessors();
to determine the core count in the runtime.
public static void main(String[] args) throws ClassNotFoundException, SQLException, IOException, InterruptedException {
int size = csvData.size();
int threadCount = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
int index = 0;
int chunkSize = size / threadCount;
while (index < size) {
final int start = index;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
ProcessRecords(csvData.subList(start, chunkSize));
} catch (ClassNotFoundException | SQLException | IOException e) {
e.printStackTrace();
}
}
});
index += chunkSize;
}
executorService.shutdown();
while(!executorService.isTerminated()){
Thread.sleep(1000); //soround with try catch for InterruptedException
}
}
Upvotes: 1