vippu
vippu

Reputation: 13

Executor tasks for parallelizing a method

So I am trying to delete n files from the disk which are listed in listDir and thereby I divided the listDir in 4 parts and let it to be deleted in parallel from the disk. The intention here is to do it in parallel so as to make it fast rather than sequential. deleteObject(x,credential,token) can be assumed as an API which finally deletes an object from the disk and is an atomic operation. It returns true on successful delete and false otherwise

I have few question here

  1. So as I have 4 parallel method which I am executing though invokeAll and there are 4 threads declared by Executors.newFixedThreadPool(4) would there always be 1 thread be assigned to 1 method?
  2. Do I need to synchronize and use volatile for iterator 'i' in the for loop of parallelDeleteOperation() method. The reason I am asking this is supposing if 1st thread has not completed its task (deleting listDir1 and the for loop has not completed) and supposing in midway it got context switch and 2nd thread starts executing the same task(deleting listDir1). Just wondering if 2nd thread can get IndexOutOfBound exception in this case.
  3. Is there any advantage of dividing list in 4 parts and executing this rather than having multiple threads executing delete operation on a very big list.
  4. If one of the operation of ExecutorService returns false then on the whole deleteMain() API would return false

     private boolean deleteMain(String parent, List<Structure>
        listDir, String place, String node, Sequence<String>
                                   groups, String Uid) throws IOException {
    
    int noCores = Runtime.getRuntime().availableProcessors();
    List<List<Integer>> splittedList = splitList(listDir, noCores);
    
    System.out.println(splittedList.size());
    
    System.out.println("NoOfCores" + noCores);
    Set<Callable<Boolean>> callables = new HashSet<Callable<Boolean>>();
    for (int i = 0; i < splittedList.size(); i++) {
        List<Integer> l = splittedList.get(i);
        callables.add(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
    
                return parallelDeleteOperation(parent, listDir, place, node, groups Uid);
            }
        });
    }
    
    ExecutorService service = Executors.newFixedThreadPool(noCores);
    
    try {
        List<Future<Boolean>> futures = service.invokeAll(callables);
    
    
        for (Future<Boolean> future : futures) {
            if (future.get() != true)
                return future.get();
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    service.shutdown();
    return true;
    }
    
    private Boolean parallelDeleteOperation(String parent, List<Structure>
        listDir, String place, String node, Sequence<String>
                                                groups, String Uid) throws IOException {
    for (int i = 0; i < listDir.size(); i++) {
        final String name = listDir.get(i).filename;
    
        final String filePath = "/" + (parent.isEmpty() ? "" : (parent +
                "/")) + name;
        final DeleteMessage message = new DeleteMessage(name, place, node
                filePath);
        final boolean Status = delete(message, groups, Uid, place);
        if (Status != true)
            return Status;
    }
    return true;
    }
    

Upvotes: 1

Views: 251

Answers (2)

Maurice Perry
Maurice Perry

Reputation: 9650

I don't see any advantage to split the list in 4 sublists: as soon a a thread terminates its list, it will be idle. If you submit a task for each element in the input list, all four threads will be active until the queue is empty.

UPDATE: as someone noted, you can use a parallelStream, it's simpler and probably faster; but if you want to keep the ExecutorService, you can do something like that:

    int noCores = Runtime.getRuntime().availableProcessors();
    List<Future<Boolean>> futures = new ArrayList<>();
    ExecutorService service = Executors.newFixedThreadPool(noCores);
    try {
        for (Structure s: listDir) {
            String name = s.filename;
            String filePath = "/" + (parent.isEmpty() ? "" : (parent
                + "/")) + name;
            Future<Boolean> result = service.submit(()-> {
                final DeleteMessage message = new DeleteMessage(
                        name, place, node, filePath);
                return delete(message, groups, Uid, place);
            });
            futures.add(result);
        }
    } finally {
        service.shutdown();
    }

Upvotes: 0

Hearen
Hearen

Reputation: 7828

  1. So as I have 4 parallel method which I am executing though invokeAll and there are 4 threads declared by Executors.newFixedThreadPool(4) would there always be 1 thread be assigned to 1 method?

Normally it's right if the tasks amount is equal to the threads in the pool but there is no guarantee. It depends pretty much on the tasks in the pool.

        Executors.newFixedThreadPool(4).invokeAll(IntStream.range(0, 8).mapToObj(i -> (Callable<Integer>) () -> {
            System.out.println(Thread.currentThread().getName() + ": " + i);
            return 0;
        }).collect(Collectors.toList()));

If there are more tasks in the pool, as above, the output can be as follows. There is no obvious predictable rule to process the tasks:

pool-1-thread-1: 0
pool-1-thread-2: 1
pool-1-thread-3: 2
pool-1-thread-1: 4
pool-1-thread-1: 5
pool-1-thread-1: 6
pool-1-thread-1: 7
pool-1-thread-4: 3
  1. Do I need to synchronize and use volatile for iterator 'i' in the for loop of parallelDeleteOperation() method.

No, you need not. You've already splitted up your original list into separate four lists.

In your code:

final List listDir1 = listDir.subList(0, listDir.size() / 4);

As for your No.3 question:

  1. Is there any advantage of dividing list in 4 parts and executing this rather than having multiple threads executing delete operation on a very big list.

You'd better have some tests under realistic conditions. Which is too complicated to just say it's better or not.

When you are deleting on a big list, the race condition could be more severe than splitting up the list beforehand, which might be extra overhead.

Besides, even without any parallelism, its performance could be not bad. And when it comes to multi-user system the parallelized version can be even worse than the sequential one due to thread context switching overhead.

You have to test them and the test assistant code can be:

    Long start = 0L;
    List<Long> list = new ArrayList<>();
    for (int i = 0; i < 1_000; ++i) {
        start = System.nanoTime();
        // your method to be tested;
        list.add(System.nanoTime() - start);
    }
    System.out.println("Time cost summary: " + list.stream().collect(Collectors.summarizingLong(Long::valueOf)));
  1. If one of the operation of ExecutorService returns false then on the whole deleteMain() API would return false

I'd like to refactor your code as this making it cleaner and also meet your last requirement (No.4):

// using the core as the count;
// since your task is CPU-bound, we can directly use parallelStream;
private static void testThreadPool(List<Integer> listDir) {
    // if one of the tasks failed, you got isFailed == true;
    boolean isFailed = splitList(listDir, Runtime.getRuntime().availableProcessors()).stream()
            .parallel().map(YourClass::parallelDeleteOperation).anyMatch(ret -> ret == false); // if any is false, it gives you false
}


// split up the list into "count" lists;
private static <T> List<List<T>> splitList(List<T> list, int count) {
    List<List<T>> listList = new ArrayList<>();
    for (int i = 0, blockSize = list.size() / count; i < count; ++i) {
        listList.add(list.subList(i * blockSize, Math.min((i+1) * blockSize, list.size()));
    }
    return listList;
}

Upvotes: 1

Related Questions