raghuram gururajan
raghuram gururajan

Reputation: 563

Split Files in a directory uniformly across threads in JAVA

I have a variable list of files in a directory and I have different threads in Java to process them. The threads are variable depending upon the current processor

int numberOfThreads=Runtime.getRuntime().availableProcessors();

File[] inputFilesArr=currentDirectory.listFiles();

How do I split the files uniformly across threads? If I do simple math like

int filesPerThread=inputFilesArr.length/numberOfThreads

then I might end up missing some files if the inputFilesArr.length and numberOfThreads are not exactly divisible by each other. What is an efficient way of doing this so that the partition and load across all the threads are uniform?

Upvotes: 0

Views: 1568

Answers (5)

Evgeniy Dorofeev
Evgeniy Dorofeev

Reputation: 136002

If you are dealing with I/O even with one processor multiple threads can work in parallel, because while one thread is waiting on read(byte[]) processor can run another thread.

Anyway, this is my solution

int nThreads = 2;
File[] files = new File[9];
int filesPerThread = files.length / nThreads;

class Task extends Thread {
    List<File> list = new ArrayList<>();
                // implement run here
}

Task task = new Task();
List<Task> tasks = new ArrayList<>();
tasks.add(task);
for (int i = 0; i < files.length; i++) {
    if (task.list.size() == filesPerThread && files.length - i >= filesPerThread) {
        task = new Task();
        tasks.add(task);
    }
    task.list.add(files[i]);
}
for(Task t : tasks) {
    System.out.println(t.list.size());
}

prints 4 5

Note that it will create 3 threads if you have 3 files and 5 processors

Upvotes: 0

Santosh
Santosh

Reputation: 17893

Here is another take on this problem:

  1. Use java's ThreaPoolExecutor. Here is an example.
  2. It works on the principle of Thread Pool (you need not create threads every time you need but creates a specified number of threads at the start and uses the threads from the pool)
  3. Idea is to treat the processing of each file in a directory as independent task, to be performed by each thread.
  4. Now when you submit all tasks to the executor in loop (this makes sure that no files are left out).
  5. Executor will actually add all of these tasks to a queue and the same time it will pick up threads from the Thread pool and assign them the task till all the threads are busy.
  6. It waits till a thread becomes available. So configuring the threadpool size is vital here. Either you can have as many threads as number of files or lesser number than that.

Here I made an assumption that each file to be processed is independent of each other and its not required that a certain bunch of files to be processed by a single thread.

Upvotes: 2

bowmore
bowmore

Reputation: 11280

The Producer Consumer pattern will solve this gracefully. Have one producer (the main thread) put all the files on a bound blocking queue (see BlockingQueue). Then have a number of worker threads take a file from the queue and process it.

The work (rather than the files) will be uniformly distributed over threads, since threads that are done processing one file, come ask for the next file to process. This avoids the possible problem that one thread gets assigned only large files to process, and other threads get only small files to process.

Upvotes: 1

vishal_aim
vishal_aim

Reputation: 7854

you can try to get the range (index of start and end in inputFilesArr) of files per thread:

if (inputFilesArr.length < numberOfThreads)
        numberOfThreads = inputFilesArr.length;

int[][] filesRangePerThread = getFilesRangePerThread(inputFilesArr.length, numberOfThreads);

and

private static int[][] getFilesRangePerThread(int filesCount, int threadsCount)
{
    int[][] filesRangePerThread = new int[threadsCount][2];

    if (threadsCount > 1)
    {
        float odtRangeIncrementFactor = (float) filesCount / threadsCount;
        float lastEndIndexSet = odtRangeIncrementFactor - 1;
        int rangeStartIndex = 0;
        int rangeEndIndex = Math.round(lastEndIndexSet);

        filesRangePerThread[0] = new int[] { rangeStartIndex, rangeEndIndex };

        for (int processCounter = 1; processCounter < threadsCount; processCounter++)
        {
            rangeStartIndex = rangeEndIndex + 1;
            lastEndIndexSet += odtRangeIncrementFactor;
            rangeEndIndex = Math.round(lastEndIndexSet);
            filesRangePerThread[processCounter] = new int[] { rangeStartIndex, rangeEndIndex };
        }
    }
    else
    {
        filesRangePerThread[0] = new int[] { 0, filesCount - 1 };
    }

    return filesRangePerThread;
}

Upvotes: 0

Shivam
Shivam

Reputation: 2132

You can use round robin algorithm for most optimal distribution. Here is the pseudocode:

ProcessThread t[] = new ProcessThread[Number of Cores];
int i = 0;
foreach(File f in files)
{
    t[i++ % t.length].queueForProcessing(f);
}

foreach(Thread tt in t)
{
    tt.join();
}

Upvotes: 1

Related Questions