Luca Fülbier
Luca Fülbier

Reputation: 2731

Unit testing asynchronous computation that has to be interrupted manually

I have got a class that records eyetracking data asynchronously. There are methods to start and stop the recording process. The data is collected in a collection and the collection can only be accessed if the recording thread has finished its work. It basically encapsulates all the threading and synchronizing so the user of my library doesn't have to do it.

The heavily shortened code (generics and error handling omitted):

public class Recorder {
  private Collection accumulatorCollection;
  private Thread recordingThread;

  private class RecordingRunnable implements Runnable {
    ...

    public void run() {
      while(!Thread.currentThread().isInterrupted()) {
        // fetch data and collect it in the accumulator
        synchronized(acc) { acc.add(Eyetracker.getData()) }
      }
    }
  }

  public void start() {
    accumulatorCollection = new Collection();
    recordingThread = new Thread(new RecordingRunnable(accumulatorCollection));
    recordingThread.start();
  }

  public void stop() {
    recordingThread.interrupt();
  }

  public void getData() {
    try {
      recordingThread.join(2000);
      if(recordingThread.isAlive()) { throw Exception(); }
    }
    catch(InterruptedException e) { ... }

    synchronized(accumulatorCollection) { return accumulatorCollection; }
  }
}

The usage is quite simple:

recorder.start();
...
recorder.stop();
Collection data = recorder.getData();

My problem with the whole thing is how to test it. Currently i am doing it like this:

recorder.start();
Thread.sleep(50);
recorder.stop();
Collection data = recorder.getData();
assert(stuff);

This works, but it is non-deterministic and slows down the test suite quite a bit (i marked these tests as integration tests, so they have to be run separately to circumvent this problem).

Is there a better way?

Upvotes: 1

Views: 136

Answers (1)

vanOekel
vanOekel

Reputation: 6548

There is a better way using a CountDownLatch.

The non-deterministic part of the test stems from two variables in time you do not account for:

  • creating and starting a thread takes time and the thread may not have started executing the runnable when Thread.start() returns (the runnable will get executed, but it may be a bit later).
  • the stop/interrupt will break the while-loop in the Runnable but not immediately, it may be a bit later.

This is where a CountDownLatch comes in: it gives you precise information about where another thread is in execution. E.g. let the first thread wait on the latch, while the second "counts down" the latch as last statement within a runnable and now the first thread knows that the runnable finished. The CountDownLatch also acts as a synchronizer: whatever the second thread was writing to memory, can now be read by the first thread.

Instead of using an interrupt, you can also use a volatile boolean. Any thread reading the volatile variable is guaranteed to see the last value set by any other thread.

A CountDownLatch can also be given a timeout which is useful for tests that can hang: if you have to wait to long you can abort the whole test (e.g. shutdown executors, interrupt threads) and throw an AssertionError. In the code below I re-used the timeout to wait for a certain amount of data to collect instead of 'sleeping'.

As an optimization, use an Executor (ThreadPool) instead of creating and starting threads. The latter is relative expensive, using an Executor can really make a difference.

Below the updated code, I made it runnable as an application (main method). (edit 28/02/17: check maxCollect > 0 in while-loop)

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class Recorder {

    private final ExecutorService executor;
    private Thread recordingThread;
    private volatile boolean stopRecording;
    private CountDownLatch finishedRecording;
    private Collection<Object> eyeData;
    private int maxCollect;
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean stopped = new AtomicBoolean();

    public Recorder() {
        this(null);
    }

    public Recorder(ExecutorService executor) {
        this.executor = executor;
    }

    public Recorder maxCollect(int max) { maxCollect = max; return this; }

    private class RecordingRunnable implements Runnable {

        @Override public void run() {

            try {
                int collected = 0;
                while (!stopRecording) {
                    eyeData.add(EyeTracker.getData());
                    if (maxCollect > 0 && ++collected >= maxCollect) {
                        stopRecording = true;
                    }
                }
            } finally {
                finishedRecording.countDown();
            }
        }
    }

    public Recorder start() {

        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("already started");
        }
        stopRecording = false;
        finishedRecording = new CountDownLatch(1);
        eyeData = new ArrayList<Object>();
        // the RecordingRunnable created below will see the values assigned above ('happens before relationship')
        if (executor == null) {
            recordingThread = new Thread(new RecordingRunnable());
            recordingThread.start();
        } else {
            executor.execute(new RecordingRunnable());
        }
        return this;
    }

    public Collection<Object> getData(long timeout, TimeUnit tunit) {

        if (started.get() == false) {
            throw new IllegalStateException("start first");
        }
        if (!stopped.compareAndSet(false, true)) {
            throw new IllegalStateException("data already fetched");
        }
        if (maxCollect <= 0) {
            stopRecording = true;
        }
        boolean recordingStopped = false;
        try {
            // this establishes a 'happens before relationship'
            // all updates to eyeData are now visible in this thread.
            recordingStopped = finishedRecording.await(timeout, tunit);
        } catch(InterruptedException e) { 
            throw new RuntimeException("interrupted", e);
        } finally {
            stopRecording = true;
        }
        // if recording did not stop, do not return the eyeData (could stil be modified by recording-runnable).
        if (!recordingStopped) {
            throw new RuntimeException("recording");
        }
        // only when everything is OK this recorder instance can be re-used
        started.set(false);
        stopped.set(false);
        return eyeData;
    }

    public static class EyeTracker {

        public static Object getData() {
            try { Thread.sleep(1); } catch (Exception ignored) {}
            return new Object();
        }
    }

    public static void main(String[] args) {

        System.out.println("Starting.");
        ExecutorService exe = Executors.newSingleThreadExecutor();
        try { 
            Recorder r = new Recorder(exe).maxCollect(50).start();
            int dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
            System.out.println("Collected "  + dsize);
            r.maxCollect(100).start();
            dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
            System.out.println("Collected "  + dsize);
            r.maxCollect(0).start();
            Thread.sleep(100);
            dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
            System.out.println("Collected "  + dsize);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exe.shutdownNow();
            System.out.println("Done.");
        }
    }
}

Happy coding :)

Upvotes: 2

Related Questions