dic19
dic19

Reputation: 17971

SwingWorker, done() is executed before process() calls are finished

I have been working with SwingWorkers for a while and have ended up with a strange behavior, at least for me. I clearly understand that due to performance reasons several invocations to publish() method are coallesced in one invocation. It makes perfectly sense to me and I suspect SwingWorker keeps some kind of queue to process all that calls.

According to tutorial and API, when SwingWorker ends its execution, either doInBackground() finishes normally or worker thread is cancelled from the outside, then done() method is invoked. So far so good.

But I have an example (similar to shown in tutorials) where there are process() method calls done after done() method is executed. Since both methods execute in the Event Dispatch Thread I would expect done() be executed after all process() invocations are finished. In other words:

Expected:

Writing...
Writing...
Stopped!

Result:

Writing...
Stopped!
Writing...

Sample code

import java.awt.BorderLayout;
import java.awt.Dimension;
import java.awt.Graphics;
import java.awt.event.ActionEvent;
import java.util.List;
import javax.swing.AbstractAction;
import javax.swing.Action;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.SwingUtilities;
import javax.swing.SwingWorker;

public class Demo {

    private SwingWorker<Void, String> worker;
    private JTextArea textArea;
    private Action startAction, stopAction;

    private void createAndShowGui() {

        startAction = new AbstractAction("Start writing") {
            @Override
            public void actionPerformed(ActionEvent e) {
                Demo.this.startWriting();
                this.setEnabled(false);
                stopAction.setEnabled(true);
            }
        };

        stopAction = new AbstractAction("Stop writing") {
            @Override
            public void actionPerformed(ActionEvent e) {
                Demo.this.stopWriting();
                this.setEnabled(false);
                startAction.setEnabled(true);
            }
        };

        JPanel buttonsPanel = new JPanel();
        buttonsPanel.add(new JButton(startAction));
        buttonsPanel.add(new JButton(stopAction));

        textArea = new JTextArea(30, 50);
        JScrollPane scrollPane = new JScrollPane(textArea);

        JFrame frame = new JFrame("Test");
        frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
        frame.add(scrollPane);
        frame.add(buttonsPanel, BorderLayout.SOUTH);
        frame.pack();
        frame.setLocationRelativeTo(null);
        frame.setVisible(true);
    }

    private void startWriting() {
        stopWriting();
        worker = new SwingWorker<Void, String>() {
            @Override
            protected Void doInBackground() throws Exception {
                while(!isCancelled()) {
                    publish("Writing...\n");
                }
                return null;
            }

            @Override
            protected void process(List<String> chunks) {
                String string = chunks.get(chunks.size() - 1);
                textArea.append(string);
            }

            @Override
            protected void done() {
                textArea.append("Stopped!\n");
            }
        };
        worker.execute();
    }

    private void stopWriting() {
        if(worker != null && !worker.isCancelled()) {
            worker.cancel(true);
        }
    }

    public static void main(String[] args) {
        SwingUtilities.invokeLater(new Runnable() {
            @Override
            public void run() {
                new Demo().createAndShowGui();
            }
        });
    }
}

Upvotes: 20

Views: 5284

Answers (2)

mike rodent
mike rodent

Reputation: 15632

Having read DSquare's superb answer, and concluded from it that some subclassing would be needed, I've come up with this idea for anyone who needs to make sure all published chunks have been processed in the EDT before moving on.

NB I tried to write it in Java rather than Jython (my language of choice and officially the best language in the world), but it is a bit complicated because, for example, publish is final, so you'd have to invent another method to call it, and also because you have to (yawn) parameterise everything with generics in Java.

This code should be understandable by any Java person: just to help, with self.publication_counter.get(), this evaluates to False when the result is 0.

# this is how you say Worker... is a subclass of SwingWorker in Python/Jython
class WorkerAbleToWaitForPublicationToFinish( javax.swing.SwingWorker ):

    # __init__ is the constructor method in Python/Jython
    def __init__( self ):

        # we just add an "attribute" (here, publication_counter) to the object being created (self) to create a field of the new object
        self.publication_counter = java.util.concurrent.atomic.AtomicInteger()

    def await_processing_of_all_chunks( self ):
        while self.publication_counter.get():
            time.sleep( 0.001 )

    # fully functional override of the Java method     
    def process( self, chunks ):
        for chunk in chunks:
            pass
            # DO SOMETHING WITH EACH CHUNK

        # decrement the counter by the number of chunks received
        # NB do this AFTER dealing with the chunks 
        self.publication_counter.addAndGet( - len( chunks ) )

    # fully functional override of the Java method     
    def publish( self, *chunks ):
        # increment the counter by the number of chunks received
        # NB do this BEFORE publishing the chunks
        self.publication_counter.addAndGet( len( chunks ))
        self.super__publish( chunks )

So in your calling code, you put something like:

    engine.update_xliff_task.get()
    engine.update_xliff_task.await_processing_of_all_chunks()

PS the use of a while clause like this (i.e. a polling technique) is hardly elegant. I looked at the available java.util.concurrent classes such as CountDownLatch and Phaser (both with thread-blocking methods), but I don't think either would suit for this purpose...

later

I was interested enough in this to tweak a proper concurrency class (written in Java, found on the Apache site) called CounterLatch. Their version stops the thread at await() if a value of an AtomicLong counter is reached. My version here allows to you to either to do that, or the opposite: to say "wait until the counter reaches a certain value before lifting the latch":

NB use of AtomicLong for signal and AtomicBoolean for released: because in the original Java they use the volatile keyword. I think using the atomic classes will achieve the same purpose.

class CounterLatch():
    def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ):
        self.count = java.util.concurrent.atomic.AtomicLong( initial )
        self.signal = java.util.concurrent.atomic.AtomicLong( wait_value )

        class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ):
            def tryAcquireShared( sync_self, arg ):
                if lift_on_reached:
                    return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1
                else:
                    return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1
            def tryReleaseShared( self, args ):
                return True

        self.sync = Sync()
        self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False

    def await( self, *args ):
        if args:
            assert len( args ) == 2
            assert type( args[ 0 ] ) is int
            timeout = args[ 0 ]
            assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit
            unit = args[ 1 ]
            return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
        else:
            self.sync.acquireSharedInterruptibly( 1 )

    def count_relative( self, n ):
        previous = self.count.addAndGet( n )
        if previous == self.signal.get():
            self.sync.releaseShared( 0 )
        return previous

So my code now looks like this:

In the SwingWorker constructor:

self.publication_counter_latch = CounterLatch() 

In SW.publish:

self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )

In the thread waiting for chunk processing to stop:

worker.publication_counter_latch.await()

Upvotes: 0

DSquare
DSquare

Reputation: 2468

SHORT ANSWER:

This happens because publish() doesn't directly schedule process, it sets a timer which will fire the scheduling of a process() block in the EDT after DELAY. So when the worker is cancelled there is still a timer waiting to schedule a process() with the data of the last publish. The reason for using a timer is to implement the optimization where a single process may be executed with the combined data of several publishes.

LONG ANSWER:

Let's see how publish() and cancel interact with each other, for that, let us dive into some source code.

First the easy part, cancel(true):

public final boolean cancel(boolean mayInterruptIfRunning) {
    return future.cancel(mayInterruptIfRunning);
}

This cancel ends up calling the following code:

boolean innerCancel(boolean mayInterruptIfRunning) {
    for (;;) {
        int s = getState();
        if (ranOrCancelled(s))
            return false;
        if (compareAndSetState(s, CANCELLED)) // <-----
            break;
    }
    if (mayInterruptIfRunning) {
        Thread r = runner;
        if (r != null)
            r.interrupt(); // <-----
    }
    releaseShared(0);
    done(); // <-----
    return true;
}

The SwingWorker state is set to CANCELLED, the thread is interrupted and done() is called, however this is not SwingWorker's done, but the future done(), which is specified when the variable is instantiated in the SwingWorker constructor:

future = new FutureTask<T>(callable) {
    @Override
    protected void done() {
        doneEDT();  // <-----
        setState(StateValue.DONE);
    }
};

And the doneEDT() code is:

private void doneEDT() {
    Runnable doDone =
        new Runnable() {
            public void run() {
                done(); // <-----
            }
        };
    if (SwingUtilities.isEventDispatchThread()) {
        doDone.run(); // <-----
    } else {
        doSubmit.add(doDone);
    }
}

Which calls the SwingWorkers's done() directly if we are in the EDT which is our case. At this point the SwingWorker should stop, no more publish() should be called, this is easy enough to demonstrate with the following modification:

while(!isCancelled()) {
    textArea.append("Calling publish\n");
    publish("Writing...\n");
}

However we still get a "Writing..." message from process(). So let us see how is process() called. The source code for publish(...) is

protected final void publish(V... chunks) {
    synchronized (this) {
        if (doProcess == null) {
            doProcess = new AccumulativeRunnable<V>() {
                @Override
                public void run(List<V> args) {
                    process(args); // <-----
                }
                @Override
                protected void submit() {
                    doSubmit.add(this); // <-----
                }
            };
        }
    }
    doProcess.add(chunks);  // <-----
}

We see that the run() of the Runnable doProcess is who ends up calling process(args), but this code just calls doProcess.add(chunks) not doProcess.run() and there's a doSubmit around too. Let's see doProcess.add(chunks).

public final synchronized void add(T... args) {
    boolean isSubmitted = true;
    if (arguments == null) {
        isSubmitted = false;
        arguments = new ArrayList<T>();
    }
    Collections.addAll(arguments, args); // <-----
    if (!isSubmitted) { //This is what will make that for multiple publishes only one process is executed
        submit(); // <-----
    }
}

So what publish() actually does is adding the chunks into some internal ArrayList arguments and calling submit(). We just saw that submit just calls doSubmit.add(this), which is this very same add method, since both doProcess and doSubmit extend AccumulativeRunnable<V>, however this time around V is Runnable instead of String as in doProcess. So a chunk is the runnable that calls process(args). However the submit() call is a completely different method defined in the class of doSubmit:

private static class DoSubmitAccumulativeRunnable
     extends AccumulativeRunnable<Runnable> implements ActionListener {
    private final static int DELAY = (int) (1000 / 30);
    @Override
    protected void run(List<Runnable> args) {
        for (Runnable runnable : args) {
            runnable.run();
        }
    }
    @Override
    protected void submit() {
        Timer timer = new Timer(DELAY, this); // <-----
        timer.setRepeats(false);
        timer.start();
    }
    public void actionPerformed(ActionEvent event) {
        run(); // <-----
    }
}

It creates a Timer that fires the actionPerformed code once after DELAY miliseconds. Once the event is fired the code will be enqueued in the EDT which will call an internal run() which ends up calling run(flush()) of doProcess and thus executing process(chunk), where chunk is the flushed data of the arguments ArrayList. I skipped some details, the chain of "run" calls is like this:

  • doSubmit.run()
  • doSubmit.run(flush()) //Actually a loop of runnables but will only have one (*)
  • doProcess.run()
  • doProcess.run(flush())
  • process(chunk)

(*)The boolean isSubmited and flush() (which resets this boolean) make it so additional calls to publish don't add doProcess runnables to be called in doSubmit.run(flush()) however their data is not ignored. Thus executing a single process for any amount of publishes called during the life of a Timer.

All in all, what publish("Writing...") does is scheduling the call to process(chunk) in the EDT after a DELAY. This explains why even after we cancelled the thread and no more publishes are done, still one process execution appears, because the moment we cancel the worker there's (with high probability) a Timer that will schedule a process() after done() is already scheduled.

Why is this Timer used instead of just scheduling process() in the EDT with an invokeLater(doProcess)? To implement the performance optimization explained in the docs:

Because the process method is invoked asynchronously on the Event Dispatch Thread multiple invocations to the publish method might occur before the process method is executed. For performance purposes all these invocations are coalesced into one invocation with concatenated arguments. For example:

 publish("1");
 publish("2", "3");
 publish("4", "5", "6");

might result in:
 process("1", "2", "3", "4", "5", "6")

We now know that this works because all the publishes that occur within a DELAY interval are adding their args into that internal variable we saw arguments and the process(chunk) will execute with all that data in one go.

IS THIS A BUG? WORKAROUND?

It's hard to tell If this is a bug or not, It might make sense to process the data that the background thread has published, since the work is actually done and you might be interested in getting the GUI updated with as much info as you can (if that's what process() is doing, for example). And then it might not make sense if done() requires to have all the data processed and/or a call to process() after done() creates data/GUI inconsistencies.

There's an obvious workaround if you don't want any new process() to be executed after done(), simply check if the worker is cancelled in the process method too!

@Override
protected void process(List<String> chunks) {
    if (isCancelled()) return;
    String string = chunks.get(chunks.size() - 1);
    textArea.append(string);
}

It's more tricky to make done() be executed after that last process(), for example done could just use also a timer that will schedule the actual done() work after >DELAY. Although I can't think this is would be a common case since if you cancelled It shouldn't be important to miss one more process() when we know that we are in fact cancelling the execution of all the future ones.

Upvotes: 18

Related Questions