Reputation: 2396
I am looking for a "better" way of pausing and resuming execution of worker threads' tasks.
Usually one would use a boolean flag such as an AtomicBoolean to set a flag state which the worker threads check before a next iteration in their work.
Something like the code snippet below
AtomicBoolean b = new AtomicBoolean(true);
Runnable r = () -> {
while (true) {
// check for pause
while (!b.get()) {
// sleep thread then check again
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// do work
}
};
Thread p = new Thread(r);
p.start();
This is certainly not an elegant solution, and for this reason I have been experimenting with locks and conditions.
Before jumping into my attempt at using locks & conditions, first we take a look at the examples and docs. I will be using a ReentrantLock and a Condition as my flag for pause/resume states in my ExecutorService worker tasks.
ReentrantLock
Using ReentrantLock.lock() will pause execution of the current thread until the lock can be acquired, an alternative is ReentrantLock.tryLock() which allows us to acquire the lock if not held by another thread.
Condition
Condition by way of the example found here allows one to signal another thread to either wait or continue with execution by either using Condition.await() or Condition.signal() (or Condition.signalAll() ), see below
Problem:
By Reviewing the documentation specifically for Condition.await()
and the 4 conditions in which await()
can be called, it leads me to believe that Condition can only be used to resume a task, not pause a task.
The lock associated with this Condition is atomically released and the current thread becomes disabled for thread scheduling purposes and lies dormant until one of four things happens:
- Some other thread invokes the signal() method for this Condition and the current thread happens to be chosen as the thread to be awakened; or
This means I can call condition.signalAll()
from my UI thread on a condition that is currently await()
ing in a worker thread, thus scheduling it for execution. i.e. Resume
However I cannot see a way to arrive at this state i.e. Pause using conditions since for a condition to go into an await state, the documentation says:
The current thread is assumed to hold the lock associated with this Condition when this method is called. It is up to the implementation to determine if this is the case and if not, how to respond. Typically, an exception will be thrown (such as IllegalMonitorStateException) and the implementation must document that fact.
My test example using Android
Logic
MainActivity.java
public class MainActivity extends AppCompatActivity {
private TextView lblCounter;
private Button btnStartStop, btnPauseResume;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition waitCondition = pauseLock.newCondition();
private ExecutorService executorService = Executors.newWorkStealingPool(4);
private AtomicBoolean stopStart = new AtomicBoolean(true);
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
lblCounter = findViewById(R.id.counter);
btnStartStop = findViewById(R.id.startStop);
btnPauseResume = findViewById(R.id.pauseResume);
btnStartStop.setOnClickListener(v -> {
if (stopStart.get()) {
start();
} else {
stop();
}
});
btnPauseResume.setOnClickListener(v -> {
pauseResume();
});
}
public void start() {
btnStartStop.setText("Stop");
AtomicInteger i = new AtomicInteger(0);
executorService.execute(() -> {
while (true) {
// here we check if the lock is locked, if so, we should wait await until a signal is emmited
while (pauseLock.isLocked()) {
try {
waitCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int i1 = i.incrementAndGet();
lblCounter.setText(String.valueOf(i1));
}
});
}
public void stop() {
executorService.shutdownNow();
btnStartStop.setText("Start");
}
public void pauseResume() {
if (pauseLock.isLocked()) {
pauseLock.unlock();
waitCondition.signal();
btnPauseResume.setText("Pause");
} else {
pauseLock.lock();
btnPauseResume.setText("Resume");
}
}
}
Main Activity XML
<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
tools:context=".MainActivity">
<TextView
android:id="@+id/counter"
android:layout_width="0dp"
android:layout_height="wrap_content"
android:layout_marginStart="96dp"
android:layout_marginTop="8dp"
android:layout_marginEnd="96dp"
android:text="123"
android:textColor="#000000"
android:textSize="36sp"
android:textStyle="bold"
app:layout_constraintEnd_toEndOf="parent"
app:layout_constraintLeft_toLeftOf="parent"
app:layout_constraintRight_toRightOf="parent"
app:layout_constraintStart_toStartOf="parent"
app:layout_constraintTop_toBottomOf="@+id/textView" />
<Button
android:id="@+id/startStop"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_marginBottom="128dp"
android:text="Start"
app:layout_constraintBottom_toBottomOf="parent"
app:layout_constraintEnd_toStartOf="@+id/pauseResume"
app:layout_constraintHorizontal_bias="0.5"
app:layout_constraintStart_toStartOf="parent" />
<Button
android:id="@+id/pauseResume"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="Pause"
app:layout_constraintBottom_toBottomOf="@+id/startStop"
app:layout_constraintEnd_toEndOf="parent"
app:layout_constraintHorizontal_bias="0.5"
app:layout_constraintStart_toEndOf="@+id/startStop" />
<TextView
android:id="@+id/textView"
android:layout_width="0dp"
android:layout_height="wrap_content"
android:layout_marginStart="96dp"
android:layout_marginTop="128dp"
android:layout_marginEnd="96dp"
android:text="Counter"
app:layout_constraintEnd_toEndOf="parent"
app:layout_constraintStart_toStartOf="parent"
app:layout_constraintTop_toTopOf="parent" />
</androidx.constraintlayout.widget.ConstraintLayout>
A problem with this code impl above is in the while
loop before surround this waitCondition.await();
I don't acquire a lock on pauseLock
, thus when the user clicks the pauseResume button and I acquire a lock in pauseResume()
, the worker thread executes the waitCondition.await()
causing an exception below:
E/AndroidRuntime: FATAL EXCEPTION: ForkJoinPool-1-worker-1
Process: nmu.wrpv302.myapplication, PID: 11744
java.lang.IllegalMonitorStateException
at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:156)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1291)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1752)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2064)
at com.example.myapplication.MainActivity.lambda$start$2$MainActivity(MainActivity.java:61)
^ --------------------- PROBLEM -----------------------
at com.example.myapplication.-$$Lambda$MainActivity$te94WnCx7dwprXfxnjJZuoEc1_8.run(Unknown Source:4)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1411)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:285)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1155)
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1993)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1941)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Question:
Using Conditions & Locks, how can I improve on the standard boolean flag check + thread.sleep() for worker threads to pause/resume execution
Upvotes: 1
Views: 91
Reputation: 1587
I think that for your particular example of logic (to manage a counter) Conditions, Locks etc. seem to be overkill.
If we see this process as one single long living process, old good Object.wait()/notify()/notifyAll() with explicitly instantiated Thread should work just perfectly:
final Object lock = new Object();
AtomicBoolean b = new AtomicBoolean(true);
Runnable r = () -> {
_end:
while (true) {
while (b.get()) {
if (Thread.currentThread().isInterrupted()) {
break _end;
}
// increment the counter
}
synchronized (lock) {
while (!b.get()) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break _end;
}
}
}
}
};
Thread p = new Thread(r);
p.start();
...
b.set(false); // disable
...
synchronized (lock) {
b.set(true); // enable
lock.notify(); // to notify for enabled state change
}
...
b.set(false); // disable
ReentrantLock + Condition work exactly like Object.wait()/notify()/notifyAll() (inside synchonized block). All Condition's methods must be called when the current thread holds the lock, otherwise you get IllegalMonitorStateException (see https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Condition.html):
final ReentrantLock lock = new ReentrantLock();
final Condition enabled = lock.newCondition();
AtomicBoolean b = new AtomicBoolean(true);
Runnable r = () -> {
_end:
while (true) {
while (b.get()) {
if (Thread.currentThread().isInterrupted()) {
break _end;
}
// increment the counter
}
lock.lock();
try {
while (!b.get()) {
try {
enabled.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break _end;
}
}
} finally {
lock.unlock();
}
}
};
Thread p = new Thread(r);
p.start();
...
b.set(false); // disable
...
lock.lock(); // if you don't lock before signal, you get IllegalMonitorStateException
try {
b.set(true); // enable
enabled.signal(); // to notify for enabled state change
} finally {
lock.unlock();
}
...
b.set(false); // disable
We also can see the process as a sequence of increment tasks. In this case ExecutorService is a good choice. Just store the reference to your task to cancel it when required:
ExecutorService executor = Executors.newSingleThreadExecutor(); // one single thread - no race conditions between sequentially submitted tasks guaranteed
Future task;
task = executor.submit(() -> { // start increment
while (!Thread.currentThread().isInterrupted()) {
// increment the counter
}
});
...
task.cancel(true); // stop increment
...
task = executor.submit(() -> { // start increment again
while (!Thread.currentThread().isInterrupted()) {
// increment the counter
}
});
...
task.cancel(true); // stop increment
Additional notes:
Upvotes: 1