Reputation: 17971
I have been working with SwingWorkers for a while and have ended up with a strange behavior, at least for me. I clearly understand that due to performance reasons several invocations to publish() method are coallesced in one invocation. It makes perfectly sense to me and I suspect SwingWorker keeps some kind of queue to process all that calls.
According to tutorial and API, when SwingWorker ends its execution, either doInBackground() finishes normally or worker thread is cancelled from the outside, then done() method is invoked. So far so good.
But I have an example (similar to shown in tutorials) where there are process()
method calls done after done()
method is executed. Since both methods execute in the Event Dispatch Thread I would expect done()
be executed after all process()
invocations are finished. In other words:
Writing...
Writing...
Stopped!
Writing...
Stopped!
Writing...
import java.awt.BorderLayout;
import java.awt.Dimension;
import java.awt.Graphics;
import java.awt.event.ActionEvent;
import java.util.List;
import javax.swing.AbstractAction;
import javax.swing.Action;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.SwingUtilities;
import javax.swing.SwingWorker;
public class Demo {
private SwingWorker<Void, String> worker;
private JTextArea textArea;
private Action startAction, stopAction;
private void createAndShowGui() {
startAction = new AbstractAction("Start writing") {
@Override
public void actionPerformed(ActionEvent e) {
Demo.this.startWriting();
this.setEnabled(false);
stopAction.setEnabled(true);
}
};
stopAction = new AbstractAction("Stop writing") {
@Override
public void actionPerformed(ActionEvent e) {
Demo.this.stopWriting();
this.setEnabled(false);
startAction.setEnabled(true);
}
};
JPanel buttonsPanel = new JPanel();
buttonsPanel.add(new JButton(startAction));
buttonsPanel.add(new JButton(stopAction));
textArea = new JTextArea(30, 50);
JScrollPane scrollPane = new JScrollPane(textArea);
JFrame frame = new JFrame("Test");
frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
frame.add(scrollPane);
frame.add(buttonsPanel, BorderLayout.SOUTH);
frame.pack();
frame.setLocationRelativeTo(null);
frame.setVisible(true);
}
private void startWriting() {
stopWriting();
worker = new SwingWorker<Void, String>() {
@Override
protected Void doInBackground() throws Exception {
while(!isCancelled()) {
publish("Writing...\n");
}
return null;
}
@Override
protected void process(List<String> chunks) {
String string = chunks.get(chunks.size() - 1);
textArea.append(string);
}
@Override
protected void done() {
textArea.append("Stopped!\n");
}
};
worker.execute();
}
private void stopWriting() {
if(worker != null && !worker.isCancelled()) {
worker.cancel(true);
}
}
public static void main(String[] args) {
SwingUtilities.invokeLater(new Runnable() {
@Override
public void run() {
new Demo().createAndShowGui();
}
});
}
}
Upvotes: 20
Views: 5284
Reputation: 15632
Having read DSquare's superb answer, and concluded from it that some subclassing would be needed, I've come up with this idea for anyone who needs to make sure all published chunks have been processed in the EDT before moving on.
NB I tried to write it in Java rather than Jython (my language of choice and officially the best language in the world), but it is a bit complicated because, for example, publish
is final
, so you'd have to invent another method to call it, and also because you have to (yawn) parameterise everything with generics in Java.
This code should be understandable by any Java person: just to help, with self.publication_counter.get()
, this evaluates to False
when the result is 0.
# this is how you say Worker... is a subclass of SwingWorker in Python/Jython
class WorkerAbleToWaitForPublicationToFinish( javax.swing.SwingWorker ):
# __init__ is the constructor method in Python/Jython
def __init__( self ):
# we just add an "attribute" (here, publication_counter) to the object being created (self) to create a field of the new object
self.publication_counter = java.util.concurrent.atomic.AtomicInteger()
def await_processing_of_all_chunks( self ):
while self.publication_counter.get():
time.sleep( 0.001 )
# fully functional override of the Java method
def process( self, chunks ):
for chunk in chunks:
pass
# DO SOMETHING WITH EACH CHUNK
# decrement the counter by the number of chunks received
# NB do this AFTER dealing with the chunks
self.publication_counter.addAndGet( - len( chunks ) )
# fully functional override of the Java method
def publish( self, *chunks ):
# increment the counter by the number of chunks received
# NB do this BEFORE publishing the chunks
self.publication_counter.addAndGet( len( chunks ))
self.super__publish( chunks )
So in your calling code, you put something like:
engine.update_xliff_task.get()
engine.update_xliff_task.await_processing_of_all_chunks()
PS the use of a while
clause like this (i.e. a polling technique) is hardly elegant. I looked at the available java.util.concurrent
classes such as CountDownLatch
and Phaser
(both with thread-blocking methods), but I don't think either would suit for this purpose...
later
I was interested enough in this to tweak a proper concurrency class (written in Java, found on the Apache site) called CounterLatch
. Their version stops the thread at await()
if a value of an AtomicLong
counter is reached. My version here allows to you to either to do that, or the opposite: to say "wait until the counter reaches a certain value before lifting the latch":
NB use of AtomicLong
for signal
and AtomicBoolean
for released
: because in the original Java they use the volatile
keyword. I think using the atomic classes will achieve the same purpose.
class CounterLatch():
def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ):
self.count = java.util.concurrent.atomic.AtomicLong( initial )
self.signal = java.util.concurrent.atomic.AtomicLong( wait_value )
class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ):
def tryAcquireShared( sync_self, arg ):
if lift_on_reached:
return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1
else:
return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1
def tryReleaseShared( self, args ):
return True
self.sync = Sync()
self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False
def await( self, *args ):
if args:
assert len( args ) == 2
assert type( args[ 0 ] ) is int
timeout = args[ 0 ]
assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit
unit = args[ 1 ]
return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
else:
self.sync.acquireSharedInterruptibly( 1 )
def count_relative( self, n ):
previous = self.count.addAndGet( n )
if previous == self.signal.get():
self.sync.releaseShared( 0 )
return previous
So my code now looks like this:
In the SwingWorker constructor:
self.publication_counter_latch = CounterLatch()
In SW.publish:
self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )
In the thread waiting for chunk processing to stop:
worker.publication_counter_latch.await()
Upvotes: 0
Reputation: 2468
SHORT ANSWER:
This happens because publish() doesn't directly schedule process
, it sets a timer which will fire the scheduling of a process() block in the EDT after DELAY
. So when the worker is cancelled there is still a timer waiting to schedule a process() with the data of the last publish. The reason for using a timer is to implement the optimization where a single process may be executed with the combined data of several publishes.
LONG ANSWER:
Let's see how publish() and cancel interact with each other, for that, let us dive into some source code.
First the easy part, cancel(true)
:
public final boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}
This cancel ends up calling the following code:
boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED)) // <-----
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
r.interrupt(); // <-----
}
releaseShared(0);
done(); // <-----
return true;
}
The SwingWorker state is set to CANCELLED
, the thread is interrupted and done()
is called, however this is not SwingWorker's done, but the future
done(), which is specified when the variable is instantiated in the SwingWorker constructor:
future = new FutureTask<T>(callable) {
@Override
protected void done() {
doneEDT(); // <-----
setState(StateValue.DONE);
}
};
And the doneEDT()
code is:
private void doneEDT() {
Runnable doDone =
new Runnable() {
public void run() {
done(); // <-----
}
};
if (SwingUtilities.isEventDispatchThread()) {
doDone.run(); // <-----
} else {
doSubmit.add(doDone);
}
}
Which calls the SwingWorkers's done()
directly if we are in the EDT which is our case. At this point the SwingWorker should stop, no more publish()
should be called, this is easy enough to demonstrate with the following modification:
while(!isCancelled()) {
textArea.append("Calling publish\n");
publish("Writing...\n");
}
However we still get a "Writing..." message from process(). So let us see how is process() called. The source code for publish(...)
is
protected final void publish(V... chunks) {
synchronized (this) {
if (doProcess == null) {
doProcess = new AccumulativeRunnable<V>() {
@Override
public void run(List<V> args) {
process(args); // <-----
}
@Override
protected void submit() {
doSubmit.add(this); // <-----
}
};
}
}
doProcess.add(chunks); // <-----
}
We see that the run()
of the Runnable doProcess
is who ends up calling process(args)
, but this code just calls doProcess.add(chunks)
not doProcess.run()
and there's a doSubmit
around too. Let's see doProcess.add(chunks)
.
public final synchronized void add(T... args) {
boolean isSubmitted = true;
if (arguments == null) {
isSubmitted = false;
arguments = new ArrayList<T>();
}
Collections.addAll(arguments, args); // <-----
if (!isSubmitted) { //This is what will make that for multiple publishes only one process is executed
submit(); // <-----
}
}
So what publish()
actually does is adding the chunks into some internal ArrayList arguments
and calling submit()
. We just saw that submit just calls doSubmit.add(this)
, which is this very same add
method, since both doProcess
and doSubmit
extend AccumulativeRunnable<V>
, however this time around V
is Runnable
instead of String
as in doProcess
. So a chunk is the runnable that calls process(args)
. However the submit()
call is a completely different method defined in the class of doSubmit
:
private static class DoSubmitAccumulativeRunnable
extends AccumulativeRunnable<Runnable> implements ActionListener {
private final static int DELAY = (int) (1000 / 30);
@Override
protected void run(List<Runnable> args) {
for (Runnable runnable : args) {
runnable.run();
}
}
@Override
protected void submit() {
Timer timer = new Timer(DELAY, this); // <-----
timer.setRepeats(false);
timer.start();
}
public void actionPerformed(ActionEvent event) {
run(); // <-----
}
}
It creates a Timer that fires the actionPerformed
code once after DELAY
miliseconds. Once the event is fired the code will be enqueued in the EDT which will call an internal run()
which ends up calling run(flush())
of doProcess
and thus executing process(chunk)
, where chunk is the flushed data of the arguments
ArrayList. I skipped some details, the chain of "run" calls is like this:
(*)The boolean isSubmited
and flush()
(which resets this boolean) make it so additional calls to publish don't add doProcess runnables to be called in doSubmit.run(flush()) however their data is not ignored. Thus executing a single process for any amount of publishes called during the life of a Timer.
All in all, what publish("Writing...")
does is scheduling the call to process(chunk)
in the EDT after a DELAY. This explains why even after we cancelled the thread and no more publishes are done, still one process execution appears, because the moment we cancel the worker there's (with high probability) a Timer that will schedule a process()
after done()
is already scheduled.
Why is this Timer used instead of just scheduling process() in the EDT with an invokeLater(doProcess)
? To implement the performance optimization explained in the docs:
Because the process method is invoked asynchronously on the Event Dispatch Thread multiple invocations to the publish method might occur before the process method is executed. For performance purposes all these invocations are coalesced into one invocation with concatenated arguments. For example:
publish("1"); publish("2", "3"); publish("4", "5", "6"); might result in: process("1", "2", "3", "4", "5", "6")
We now know that this works because all the publishes that occur within a DELAY interval are adding their args
into that internal variable we saw arguments
and the process(chunk)
will execute with all that data in one go.
IS THIS A BUG? WORKAROUND?
It's hard to tell If this is a bug or not, It might make sense to process the data that the background thread has published, since the work is actually done and you might be interested in getting the GUI updated with as much info as you can (if that's what process()
is doing, for example). And then it might not make sense if done()
requires to have all the data processed and/or a call to process() after done() creates data/GUI inconsistencies.
There's an obvious workaround if you don't want any new process() to be executed after done(), simply check if the worker is cancelled in the process
method too!
@Override
protected void process(List<String> chunks) {
if (isCancelled()) return;
String string = chunks.get(chunks.size() - 1);
textArea.append(string);
}
It's more tricky to make done() be executed after that last process(), for example done could just use also a timer that will schedule the actual done() work after >DELAY. Although I can't think this is would be a common case since if you cancelled It shouldn't be important to miss one more process() when we know that we are in fact cancelling the execution of all the future ones.
Upvotes: 18