Andrey Mironov
Andrey Mironov

Reputation: 91

Java Fair Reentrant lock isn't fair in multiple iterations

I am trying to write a test, in which I want to demonstrate the difference between fair and unfair Reentrant locks. The test uses ThreadPoolExecutor and consists of multiple iterations, each of which has the following steps:

  1. Create a fair lock to test and semaphore with 1 permit to manage the time of releasing the lock.
  2. Acquire the semaphore.
  3. Submit a task, which acquires the lock, and waits for semaphore to release.
  4. Submit multiple "enumerated" tasks, each of which tries to acquire the lock and then updates the shared AtomicInteger state.
  5. Release the semaphore and wait for all tasks to finish.

So for the fair lock the final value of shared state have to be equal to the index of the last task. But the test fails in ~ 50% of all executions.

My code looks like this:

    @Test
    void should_be_fair() throws InterruptedException, ExecutionException {
        int iterationsCount = 100;
        int waitingThreadsCount = 5;

        ReentrantLock lock = new ReentrantLock(true);
        Semaphore unlockingSemaphore = new Semaphore(1);
        boolean wasAnyThreadUnfair = false;

        for (int i = 0; i < iterationsCount; i++) {
            unlockingSemaphore.acquire();
            ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(waitingThreadsCount + 1);
            Future<?> lockingFuture = executor.submit(() -> {
                try {
                    lock.lock();
                    unlockingSemaphore.acquire();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    unlockingSemaphore.release();
                    lock.unlock();
                }
            });
            AtomicInteger sharedState = new AtomicInteger();
            List<Future<Integer>> futures = IntStream.rangeClosed(1, waitingThreadsCount)
                    .sequential()
                    .mapToObj(j -> executor.submit(() -> {
                        try {
                            lock.lock();
                            System.out.println("Acquiring lock for j=" + j);
                            return sharedState.updateAndGet((k) -> j);
                        } finally {
                            lock.unlock();
                        }
                    }))
                    .toList();
            unlockingSemaphore.release();
            lockingFuture.get();
            futures.forEach(f -> {
                try {
                    f.get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
            executor.shutdown();
            System.out.println("Ended " + i + "-th cycle with the last index=" + sharedState.get());
            if (sharedState.get() != waitingThreadsCount) {
                wasAnyThreadUnfair = true;
                break;
            }
        }

        Assertions.assertThat(wasAnyThreadUnfair).isFalse();
    }

Question:

What's the problem with this test? What can I fix to get the test passed in 100% of executions?

Upvotes: 6

Views: 98

Answers (1)

Andrey Mironov
Andrey Mironov

Reputation: 91

The problem was in the order of acquiring the lock by tasks. The order of submitting is not quaranteed to be equal to the order of starting. So I added the usage of Awaitility library to wait until each task is blocked by lock acquiring. Unfortunately, the test related to unfair lock doesn't pass anymore, but it is another problem.

   @Test
    void should_be_fair() throws InterruptedException, ExecutionException {
        Assertions.assertThat(wasAnyThreadUnfair(new ReentrantLock(true), 100, 5)).isFalse();
    }

    @Test
    // TODO: doesn't pass
    void should_be_unfair() throws InterruptedException, ExecutionException {
        Assertions.assertThat(wasAnyThreadUnfair(new ReentrantLock(false), 100, 5)).isTrue();
    }

    private boolean wasAnyThreadUnfair(ReentrantLock lock, int iterationsCount, int waitingThreadsCount)
            throws InterruptedException {
        Semaphore unlockingSemaphore = new Semaphore(1);
        boolean wasAnyThreadUnfair = false;

        for (int i = 0; i < iterationsCount; i++) {
            unlockingSemaphore.acquire();
            ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(waitingThreadsCount + 1);
            Future<?> lockingFuture = executor.submit(() -> {
                try {
                    lock.lock();
                    unlockingSemaphore.acquire();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    unlockingSemaphore.release();
                    lock.unlock();
                }
            });
            AtomicInteger sharedState = new AtomicInteger();
            List<Future<Integer>> futures = IntStream.rangeClosed(1, waitingThreadsCount)
                    .mapToObj(j -> {
                        Future<Integer> submitted = executor.submit(() -> {
                            try {
                                lock.lock();
                                System.out.println("Acquiring lock for j=" + j);
                                return sharedState.updateAndGet((k) -> j);
                            } finally {
                                lock.unlock();
                            }
                        });
                        await().atMost(150, TimeUnit.MILLISECONDS).until(() -> lock.getQueueLength() == j);
                        return submitted;
                    })
                    .toList();
            unlockingSemaphore.release();
            executor.shutdown();
            executor.awaitTermination(1, TimeUnit.SECONDS);
            System.out.println("Ended " + i + "-th cycle with the last index=" + sharedState.get());
            if (sharedState.get() != waitingThreadsCount) {
                wasAnyThreadUnfair = true;
                break;
            }
        }

        return wasAnyThreadUnfair;
    }

Upvotes: 3

Related Questions