VVP
VVP

Reputation: 826

Alternate apporach in processing huge array using Multithreading

I am new to multithreading in Java .I have implemented a multithreading program in java to process an array and need your help and suggestions to optimise it and refactor it if possible.

Scenario
We get a huge csv file, which has over 1000s of rows and we need to process it. So i basically convert them to array, split them and pass to execution program and input will be subset of the arrays. Right now i am splitting the array to 20 equal subset and passing to 20 threads for execution. It is taking ~2 mins which is fine . Without multithreading it takes 30 mins.

Help needed

I am giving the snapshot of my code below. Although it works fine, i am wondering whether there is any way to standardize it more and refactor it. Rightnow it looks clumsy. TO be more specific, instead of creating individual thread runners if i can parameterize it, then it will be great.

Code

private static void ProcessRecords(List<String[]> inputCSVData)    
{    

// Do some operation    
}    


**In the main program**    

public static void main(String[] args)throws ClassNotFoundException, SQLException, IOException, InterruptedException         
{            
    int size = csvData.size();        
    // Split the array        
    int firstArraySize = (size / 20);        
    int secondArrayEndIndex = (firstArraySize * 2) - 1;        

    csvData1 = csvData.subList(1, firstArraySize);    
    csvData2 = csvData.subList(firstArraySize, secondArrayEndIndex);    
    // ....    and so on    

    Thread thread1 = new Thread(new Runnable() {    
    public void run() {    
    try {    
    ProcessRecords(csvData1);        
    } catch (ClassNotFoundException | SQLException | IOException e) {    
    // TODO Auto-generated catch block    
    e.printStackTrace();    
    }    
    }    
    });    

    Thread thread2 = new Thread(new Runnable() {    
    public void run() 
    {     
    try {    
    ProcessRecords(csvData2);            
    } catch (ClassNotFoundException | SQLException | IOException e) {    
    // TODO Auto-generated catch block    
    e.printStackTrace();        
    }    
    }    
    });    

    **and so on for 20 times**    

    thread1.start();     
    thread2.start();    
    //... For all remaining threads    
    // thread20.start();    

    thread1.join();    
    thread2.join();    
    //... For all remaining threads    
    // thread20.join();    

    }  

Upvotes: 1

Views: 239

Answers (2)

Nicolas Filotto
Nicolas Filotto

Reputation: 44965

Since Java 7, you can implement such mechanism efficiently out of the box thanks to the Fork/Join Framework. Starting from Java 8, you can do it directly with the Stream API more precisely with a parallel stream which uses behind the scene a ForkJoinPool in order to leverage its work-stealing algorithm to provide the best possible performances.

In your case, you could process it line by line as next:

csvData.parallelStream().forEach(MyClass::ProcessRecord);

With a method ProcessRecord of the class MyClass of type:

private static void ProcessRecord(String[] inputCSVData){
    // Do some operation
}

By default a parallel stream will use the common ForkJoinPool with a size corresponding to Runtime.getRuntime().availableProcessors() which is enough for tasks with very few IO, if you have tasks with IO such that you would like to increase the size of the pool, simply provide the initial task to your custom ForkJoinPool, the parallel stream will then use your pool instead of the common pool.

ForkJoinPool forkJoinPool = new ForkJoinPool(20);
forkJoinPool.submit(() -> csvData.parallelStream().forEach(MyClass::ProcessRecord)).get();

Upvotes: 4

Imesha Sudasingha
Imesha Sudasingha

Reputation: 3570

You have done a lot of redundant work to come here. You can use an ExecutorService with a FixedThreadPool and submit tasks to the thread pool, instead of hard coding 20 threads.

Also, how was the value of 20 for the number of threads decided? Use,

Runtime.getRuntime().availableProcessors();

to determine the core count in the runtime.

public static void main(String[] args) throws ClassNotFoundException, SQLException, IOException, InterruptedException {
    int size = csvData.size();
    int threadCount = Runtime.getRuntime().availableProcessors();
    ExecutorService executorService = Executors.newFixedThreadPool(threadCount);

    int index = 0;
    int chunkSize = size / threadCount;
    while (index < size) {
        final int start = index;
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    ProcessRecords(csvData.subList(start, chunkSize));
                } catch (ClassNotFoundException | SQLException | IOException e) {
                    e.printStackTrace();
                }
            }
        });
        index += chunkSize;
    }
    executorService.shutdown();

    while(!executorService.isTerminated()){
        Thread.sleep(1000); //soround with try catch for InterruptedException
    }
}

Upvotes: 1

Related Questions