jelle
jelle

Reputation: 770

saving part of collection while adding new entries by other threads

I have a batch job that is sending out asynchronously http requests and collecting them in a result list. This works perfectly.

public File addLine(CsvLine line) throws IOException {
    lines.add(line);
    return this;
}

When trying to optimize, i want the lines to be stored to a persistent location.

public File addLine(CsvLine line) throws IOException {
    lines.add(line);
    if (lines.size() > uploadService.getPartSize()) {
        List<CsvLine> copy;
        copy = new ArrayList<>(lines);
        lines.removeAll(copy);
        uploadService.save(copy);
    }
    return this;
}

Multiple threads are still adding to the lines collection, but while making a copy i make sure that i only remove the lines I'm going to save (copy collection). So this doesn't work, I tried adding the synchronized keyword around this part

public File addLine(CsvLine line) throws IOException {
    lines.add(line);
    if (lines.size() > uploadService.getPartSize()) {
        List<CsvLine> copy;
        synchronized (this) {
            copy = new ArrayList<>(lines);
            lines.removeAll(copy);
        }
        uploadService.save(copy);
    }
    return this;
}

but without any success. the constructor of the ArrayList is not thread save i assume, this is also the only part that needs to be synchronized if I'm not mistaken.

Can someone point out what I'm doing wrong?

Upvotes: 1

Views: 49

Answers (2)

starikoff
starikoff

Reputation: 1650

Synchronize all access to lines (and only it):

public File addLine(CsvLine line) throws IOException {
    List<CsvLine> copy = null;
    synchronized (lines) {
        lines.add(line);
        if (lines.size() > uploadService.getPartSize()) {
            copy = new ArrayList<>(lines);
            lines.clear();
        }
    }
    if (copy != null) {
        uploadService.save(copy);
    }
    return this;
}

Upvotes: 1

OldCurmudgeon
OldCurmudgeon

Reputation: 65813

I put together a DoubleBufferedList object to do just this.

/**
 * Lock free - thread-safe.
 *
 * Write from many threads - read with fewer threads.
 *
 * Write items of type T.
 *
 * Read items of type List<T>.
 *
 * @author OldCurmudgeon
 * @param <T>
 */
public class DoubleBufferedList<T> {

    /**
     * Atomic reference so I can atomically swap it through.
     *
     * Mark = true means I am adding to it so momentarily unavailable for iteration.
     */
    private final AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false);

    // Factory method to create a new list - may be best to abstract this.
    protected List<T> newList() {
        return new ArrayList<>();
    }

    /**
     * Get and replace the current list.
     *
     * Used by readers.
     *
     * @return List<T> of a number (possibly 0) of items of type T.
     */
    public List<T> get() {
        // Atomically grab and replace the list with an empty one.
        List<T> empty = newList();
        List<T> it;
        // Replace an unmarked list with an empty one.
        if (!list.compareAndSet(it = list.getReference(), empty, false, false)) {
            // Failed to replace!
            // It is probably marked as being appended to but may have been replaced by another thread.
            // Return empty and come back again soon.
            return Collections.<T>emptyList();
        }
        // Successfull replaced an unmarked list with an empty list!
        return it;
    }

    /**
     * Grab and lock the list in preparation for append.
     *
     * Used by add.
     */
    private List<T> grab() {
        List<T> it;
        // We cannot fail so spin on get and mark.
        while (!list.compareAndSet(it = list.getReference(), it, false, true)) {
            // Spin on mark - waiting for another grabber to release (which it must).
        }
        return it;
    }

    /**
     * Release the grabbed list.
     *
     * Opposite of grab.
     */
    private void release(List<T> it) {
        // Unmark it - should this be a compareAndSet(it, it, true, false)?
        if (!list.attemptMark(it, false)) {
            // Should never fail because once marked it will not be replaced.
            throw new IllegalMonitorStateException("It changed while we were adding to it!");
        }
    }

    /**
     * Add an entry to the list.
     *
     * Used by writers.
     *
     * @param entry - The new entry to add.
     */
    public void add(T entry) {
        List<T> it = grab();
        try {
            // Successfully marked! Add my new entry.
            it.add(entry);
        } finally {
            // Always release after a grab.
            release(it);
        }
    }

    /**
     * Add many entries to the list.
     *
     * @param entries - The new entries to add.
     */
    public void add(List<T> entries) {
        List<T> it = grab();
        try {
            // Successfully marked! Add my new entries.
            it.addAll(entries);
        } finally {
            // Always release after a grab.
            release(it);
        }
    }

    /**
     * Add a number of entries.
     *
     * @param entries - The new entries to add.
     */
    @SafeVarargs
    public final void add(T... entries) {
        // Make a list of them.
        add(Arrays.<T>asList(entries));
    }

}

Use it as a normal List but get returns you a List<T> containing a safe list of items that are available for processing.

final String BYE = "BYE!";

public void test() throws InterruptedException {
    final DoubleBufferedList<String> l = new DoubleBufferedList<>();
    // Producer.
    Thread p = new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 100000; i++) {
                // Pump the queue.
                l.add("" + i);
            }
            l.add(BYE);
        }
    });
    // Consumer.
    Thread c = new Thread(new Runnable() {
        @Override
        public void run() {
            boolean done = false;
            do {
                List<String> got = l.get();
                System.out.println("Got: " + got);
                for (String s : got) {
                    if (s.equals(BYE)) {
                        done = true;
                    }
                }
            } while (!done);
        }
    });
    // Fire it up.
    p.start();
    c.start();
    // Wait for them to finish.
    p.join();
    c.join();
}

Upvotes: 1

Related Questions