Zevs
Zevs

Reputation: 21

Multithreaded merge sort, adding additional threads

I am facing one problem in multithreaded merge sort algorithm in java.

I should modify the code into 3,4,5,6,7,8 threaded merge sorting by dividing original array into subArrays. Currently it has 2 subArrays. How can I split original array into 3, 4 ,5,6,7,8 subArrays to achive my goal? Moreover, I should write some more methods because mergeSort method calls lefthalf and righthalf methods at the moment. So for 3,4,5,6,7,8 threads I should write additional methods. How can i handle this?

two_threaded_merge_sort.java

public class two_threaded_merge_sort {

public static void finalMerge(int[] a, int[] b) {
    int[] result = new int[a.length + b.length];
    int i=0; 
    int j=0; 
    int r=0;
    while (i < a.length && j < b.length) {
        if (a[i] <= b[j]) {
            result[r]=a[i];
            i++;
            r++;
        } else {
            result[r]=b[j];
            j++;
            r++;
        }
        if (i==a.length) {
            while (j<b.length) {
                result[r]=b[j];
                r++;
                j++;
            }
        }
        if (j==b.length) {
            while (i<a.length) {
                result[r]=a[i];
                r++;
                i++;
            }
        }
    }
}

public static void main(String[] args) throws InterruptedException {
    Random rand = new Random();
    int[] original = new int[9000000];
    for (int i=0; i<original.length; i++) {
        original[i] = rand.nextInt(1000);
    }

    long startTime = System.currentTimeMillis();
    int[] subArr1 = new int[original.length/2];
    int[] subArr2 = new int[original.length - original.length/2];
    System.arraycopy(original, 0, subArr1, 0, original.length/2);
    System.arraycopy(original, original.length/2, subArr2, 0, original.length - original.length/2);

    Worker runner1 = new Worker(subArr1);
    Worker runner2 = new Worker(subArr2);
    runner1.start();
    runner2.start();
    runner1.join();
    runner2.join();
    finalMerge (runner1.getInternal(), runner2.getInternal());
    long stopTime = System.currentTimeMillis();
    long elapsedTime = stopTime - startTime;
    System.out.println("2-thread MergeSort takes: " + (float)elapsedTime/1000 + " seconds");
}

}

Worker.java

class Worker extends Thread {
private int[] internal;

public int[] getInternal() {
    return internal;
}

public void mergeSort(int[] array) {
    if (array.length > 1) {
        int[] left = leftHalf(array);
        int[] right = rightHalf(array);

        mergeSort(left);
        mergeSort(right);

        merge(array, left, right);
    }
}

public int[] leftHalf(int[] array) {
    int size1 = array.length / 2;
    int[] left = new int[size1];
    for (int i = 0; i < size1; i++) {
        left[i] = array[i];
    }
    return left;
}

public int[] rightHalf(int[] array) {
    int size1 = array.length / 2;
    int size2 = array.length - size1;
    int[] right = new int[size2];
    for (int i = 0; i < size2; i++) {
        right[i] = array[i + size1];
    }
    return right;
}

public void merge(int[] result, int[] left, int[] right) {
    int i1 = 0;   
    int i2 = 0;   

    for (int i = 0; i < result.length; i++) {
        if (i2 >= right.length || (i1 < left.length && left[i1] <= right[i2])) {
            result[i] = left[i1];   
            i1++;
        } else {
            result[i] = right[i2];   
            i2++;
        }
    }
}

Worker(int[] arr) {
    internal = arr;
}

public void run() {
    mergeSort(internal);
}
}

Thanks very much!

Upvotes: 3

Views: 2800

Answers (2)

David P&#233;rez Cabrera
David P&#233;rez Cabrera

Reputation: 5048

From my point of view, your hard work is done. now you must parametrize the algorithm with number of threads.

Your algorithm has two parts

  1. split the work.
  2. merge the k-parts.

And two components:

  1. Main algorithm
  2. Workers.

About the threads

In my opinion, Start/join method aren't useful in this case, because last merging can't start until all threads are finish. I prefer '2 way merge' (@rcgldr answer) and a thread pool (ExecutorService). You must be careful with threads synchronization and shared memory.

To sum up, I propose a little different solution:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class MultithreadedMergeSort {

    private int[] array;
    private int numThreads;
    private List<int[]> sortedFragments;

    private MultithreadedMergeSort(int numThreads, int[] array) {
        this.numThreads = numThreads;
        this.array = array;
    }

    // Basic algorithm: it sort recursively a fragment
    private static void recursiveMergeSort(int[] array, int begin, int end) {
        if (end - begin > 1) {
            int middle = (begin + end) / 2;
            recursiveMergeSort(array, begin, middle);
            recursiveMergeSort(array, middle, end);
            merge(array, begin, middle, end);
        }
    }

    // Basic algorithm: it merges two consecutives sorted fragments
    private static void merge(int[] array, int begin, int middle, int end) {
        int[] firstPart = Arrays.copyOfRange(array, begin, middle);
        int i = 0;
        int j = middle;
        int k = begin;
        while (i < firstPart.length && j < end) {
            if (firstPart[i] <= array[j]) {
                array[k++] = firstPart[i++];
            } else {
                array[k++] = array[j++];
            }
        }
        if (i < firstPart.length) {
            System.arraycopy(firstPart, i, array, k, firstPart.length - i);
        }
    }

    public static void sort(int[] array, int numThreads) throws InterruptedException {
        if (array != null && array.length > 1) {
            if (numThreads > 1) {
                new MultithreadedMergeSort(numThreads, array).mergeSort();
            } else {
                recursiveMergeSort(array, 0, array.length);
            }
        }
    }

    private synchronized void mergeSort() throws InterruptedException {
        // A thread pool
        ExecutorService executors = Executors.newFixedThreadPool(numThreads);
        this.sortedFragments = new ArrayList<>(numThreads - 1);
        int begin = 0;
        int end = 0;

        // it split the work
        for (int i = 1; i <= (numThreads - 1); i++) {
            begin = end;
            end = (array.length * i) / (numThreads - 1);
            // sending the work to worker
            executors.execute(new MergeSortWorker(begin, end));
        }
        // this is waiting until work is done
        wait();

        // shutdown the thread pool.
        executors.shutdown();
    }

    private synchronized int[] notifyFragmentSorted(int begin, int end) {
        if (begin > 0 || end < array.length) {
            // the array is not completely sorted

            Iterator<int[]> it = sortedFragments.iterator();
            // searching a previous or next fragment
            while (it.hasNext()) {
                int[] f = it.next();
                if (f[1] == begin || f[0] == end) {
                    // It found a previous/next fragment
                    it.remove();
                    return f;
                }
            }
            sortedFragments.add(new int[]{begin, end});
        } else {
            // the array is sorted
            notify();
        }
        return null;
    }

    private class MergeSortWorker implements Runnable {

        int begin;
        int end;

        public MergeSortWorker(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }

        @Override
        public void run() {
            // Sort a fragment
            recursiveMergeSort(array, begin, end);
            // notify the sorted fragment
            int[] nearFragment = notifyFragmentSorted(begin, end);

            while (nearFragment != null) {
                // there's more work: merge two consecutives sorted fragments, (begin, end) and nearFragment
                int middle;
                if (nearFragment[0] < begin) {
                    middle = begin;
                    begin = nearFragment[0];
                } else {
                    middle = nearFragment[0];
                    end = nearFragment[1];
                }
                merge(array, begin, middle, end);
                nearFragment = notifyFragmentSorted(begin, end);
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int numThreads = 5;

        Random rand = new Random();
        int[] original = new int[9000000];
        for (int i = 0; i < original.length; i++) {
            original[i] = rand.nextInt(1000);
        }

        long startTime = System.currentTimeMillis();

        MultithreadedMergeSort.sort(original, numThreads);

        long stopTime = System.currentTimeMillis();
        long elapsedTime = stopTime - startTime;
        // warning: Take care with microbenchmarks
        System.out.println(numThreads + "-thread MergeSort takes: " + (float) elapsedTime / 1000 + " seconds");
    }
}

Upvotes: 1

rcgldr
rcgldr

Reputation: 28826

There needs to be a sort function that separates the array into k parts, then create k threads to sort each part, using either top down or bottom up approach, (bottom up would slightly faster), and wait for all threads to complete.

At this point there are k sorted parts. These could be merged all at once using a k-way merge (complicated), or merged a pair of parts at a time (2 way merge), perhaps using multiple threads, but at this point the process is probably memory bandwidth limited, so multi-threading may not help much.

When separating the array into k parts, something like this can be used to keep the sizes similar:

int r = n % k;
int s = n / k;
int t;
for each part{
    t = r ? 1 : 0;
    r -= t;
    size = s + t;
}

or

int r = n % k;
int s = n / k + 1;
while(r--){
   next part size = s; // n / k + 1
}
s -= 1;
while not done{
   next part size = s; // n / k
}

Upvotes: 2

Related Questions