leonaugust
leonaugust

Reputation: 302

Multithreading (ExecutorService) method works unexpectedly

I can't understand where is the mistake. I'm trying to make a multithreaded program that will find in array those Strings that have the same symbols. Then those strings should be printed with their indexes.

Example input: {"qwe","wqe","qwee","a","a"};
Output:
a = 3, 4
eqw = 0, 1 

But when I'm trying to run the program, something happens to their indexes. I tried to synchronize my method in such ways: Synchronize using(this), Synchronize on method level, Synchronize using created lock for each object. Maybe I'm missing something and there is some thing that will make it work?

 public class Main {

  private volatile Map<String, Integer> countByString = new ConcurrentHashMap<>();
  private volatile Map<String, String> indexesByString = new ConcurrentHashMap<>();

  public static void main(String[] args) {
    String[] arr = {"qwe", "wqe", "qwee", "a", "a"};
    Main main = new Main();
    List<Callable<Void>> tasks = new ArrayList<>();
    AtomicInteger i = new AtomicInteger(0);
    Arrays.stream(arr).forEach(str -> {
      tasks.add(() -> {
        char[] charArr = str.toCharArray();
        Arrays.sort(charArr);
        String sortedStr = new String(charArr);
        main.calculateCount(sortedStr);
        main.calculateIndex(sortedStr, i.getAndIncrement());
        return null;
      });
    });

    try {
      ExecutorService executorService = Executors.newFixedThreadPool(4);
      executorService.invokeAll(tasks);
      executorService.shutdown();
    } catch (Exception e) {
      e.printStackTrace();
    }
    main.printResult();
  }

  void calculateCount(String str) {
    synchronized (countByString) {
      int count = countByString.get(str) == null ? 0 : countByString.get(str);
      countByString.put(str, ++count);
    }
  }

  void calculateIndex(String str, int index) {
    synchronized (indexesByString) {
      System.out.println(Thread.currentThread().getName() + " " + str + " " + index);
      String indexes = indexesByString.get(str);
      if (indexes == null) {
        indexes = "";
      }
      indexes += (index + ";");
      indexesByString.put(str, indexes);
    }
  }

  private void printResult() {
    for (Map.Entry<String, Integer> entry : countByString.entrySet()) {
      String str = entry.getKey();
//      Integer count = entry.getValue();
//      if (count >= 2) {
      String indexes = indexesByString.get(str);
      System.out.println(str + " = " + indexes);
//      }
    }
  }
}

Upvotes: 1

Views: 93

Answers (1)

dreamcrash
dreamcrash

Reputation: 51393

The problem with your code is this part:

Arrays.stream(arr).forEach(str -> {
  tasks.add(() -> {
    char[] charArr = str.toCharArray();
    Arrays.sort(charArr);
    String sortedStr = new String(charArr);
    main.calculateCount(sortedStr);
    main.calculateIndex(sortedStr, i.getAndIncrement());
    return null;
  });
});

You are assigning the same work (of searching in the entire array of strings) to all threads. Consequently, threads are overriding each other's indices. What you should do instead is to distribute the work of searching for the strings among the threads. Something similar to:

tasks.add(() -> {
for(int threadIndex = i.getAndIncrement(); threadIndex < arr.length; threadIndex = i.getAndIncrement()){
        char[] charArr = arr[threadIndex].toCharArray();
        Arrays.sort(charArr);
        String sortedStr = new String(charArr);
        main.calculateCount(sortedStr);
        main.calculateIndex(sortedStr, threadIndex);
}
return null;
});

I used the thread-safe properties of the AtomicInteger to emulate a dynamic work distribution of loop iterations among threads.

There are other issues with the code that you have provided, mostly concerning performance and synchronization overhead.

private volatile Map<String, Integer> countByString = new ConcurrentHashMap<>();
private volatile Map<String, String> indexesByString = new ConcurrentHashMap<>();

In this context you neither need the volatile nor the ConcurrentHashMap, however, you could make those fields final. The volatile clause can be removed since you create the object main before the parallel region, and the ConcurrentHashMap can be changed for a HashMap instead since you are already synchronizing those fields anyway.

After focusing on the correctness of your code, you should try to minimize the synchronization overhead. For instance, you could try to replicate the countByString and indexesByString among each thread, and reduce their values sequentially, after the parallel work is done.

Naturally, given the size of your current input, it will be hard to notice meaningful differences between performance optimizations.

Upvotes: 2

Related Questions