Reputation: 770
I have a batch job that is sending out asynchronously http requests and collecting them in a result list. This works perfectly.
public File addLine(CsvLine line) throws IOException {
lines.add(line);
return this;
}
When trying to optimize, i want the lines to be stored to a persistent location.
public File addLine(CsvLine line) throws IOException {
lines.add(line);
if (lines.size() > uploadService.getPartSize()) {
List<CsvLine> copy;
copy = new ArrayList<>(lines);
lines.removeAll(copy);
uploadService.save(copy);
}
return this;
}
Multiple threads are still adding to the lines collection, but while making a copy i make sure that i only remove the lines I'm going to save (copy collection). So this doesn't work, I tried adding the synchronized keyword around this part
public File addLine(CsvLine line) throws IOException {
lines.add(line);
if (lines.size() > uploadService.getPartSize()) {
List<CsvLine> copy;
synchronized (this) {
copy = new ArrayList<>(lines);
lines.removeAll(copy);
}
uploadService.save(copy);
}
return this;
}
but without any success. the constructor of the ArrayList is not thread save i assume, this is also the only part that needs to be synchronized if I'm not mistaken.
Can someone point out what I'm doing wrong?
Upvotes: 1
Views: 49
Reputation: 1650
Synchronize all access to lines
(and only it):
public File addLine(CsvLine line) throws IOException {
List<CsvLine> copy = null;
synchronized (lines) {
lines.add(line);
if (lines.size() > uploadService.getPartSize()) {
copy = new ArrayList<>(lines);
lines.clear();
}
}
if (copy != null) {
uploadService.save(copy);
}
return this;
}
Upvotes: 1
Reputation: 65813
I put together a DoubleBufferedList
object to do just this.
/**
* Lock free - thread-safe.
*
* Write from many threads - read with fewer threads.
*
* Write items of type T.
*
* Read items of type List<T>.
*
* @author OldCurmudgeon
* @param <T>
*/
public class DoubleBufferedList<T> {
/**
* Atomic reference so I can atomically swap it through.
*
* Mark = true means I am adding to it so momentarily unavailable for iteration.
*/
private final AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false);
// Factory method to create a new list - may be best to abstract this.
protected List<T> newList() {
return new ArrayList<>();
}
/**
* Get and replace the current list.
*
* Used by readers.
*
* @return List<T> of a number (possibly 0) of items of type T.
*/
public List<T> get() {
// Atomically grab and replace the list with an empty one.
List<T> empty = newList();
List<T> it;
// Replace an unmarked list with an empty one.
if (!list.compareAndSet(it = list.getReference(), empty, false, false)) {
// Failed to replace!
// It is probably marked as being appended to but may have been replaced by another thread.
// Return empty and come back again soon.
return Collections.<T>emptyList();
}
// Successfull replaced an unmarked list with an empty list!
return it;
}
/**
* Grab and lock the list in preparation for append.
*
* Used by add.
*/
private List<T> grab() {
List<T> it;
// We cannot fail so spin on get and mark.
while (!list.compareAndSet(it = list.getReference(), it, false, true)) {
// Spin on mark - waiting for another grabber to release (which it must).
}
return it;
}
/**
* Release the grabbed list.
*
* Opposite of grab.
*/
private void release(List<T> it) {
// Unmark it - should this be a compareAndSet(it, it, true, false)?
if (!list.attemptMark(it, false)) {
// Should never fail because once marked it will not be replaced.
throw new IllegalMonitorStateException("It changed while we were adding to it!");
}
}
/**
* Add an entry to the list.
*
* Used by writers.
*
* @param entry - The new entry to add.
*/
public void add(T entry) {
List<T> it = grab();
try {
// Successfully marked! Add my new entry.
it.add(entry);
} finally {
// Always release after a grab.
release(it);
}
}
/**
* Add many entries to the list.
*
* @param entries - The new entries to add.
*/
public void add(List<T> entries) {
List<T> it = grab();
try {
// Successfully marked! Add my new entries.
it.addAll(entries);
} finally {
// Always release after a grab.
release(it);
}
}
/**
* Add a number of entries.
*
* @param entries - The new entries to add.
*/
@SafeVarargs
public final void add(T... entries) {
// Make a list of them.
add(Arrays.<T>asList(entries));
}
}
Use it as a normal List
but get
returns you a List<T>
containing a safe list of items that are available for processing.
final String BYE = "BYE!";
public void test() throws InterruptedException {
final DoubleBufferedList<String> l = new DoubleBufferedList<>();
// Producer.
Thread p = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100000; i++) {
// Pump the queue.
l.add("" + i);
}
l.add(BYE);
}
});
// Consumer.
Thread c = new Thread(new Runnable() {
@Override
public void run() {
boolean done = false;
do {
List<String> got = l.get();
System.out.println("Got: " + got);
for (String s : got) {
if (s.equals(BYE)) {
done = true;
}
}
} while (!done);
}
});
// Fire it up.
p.start();
c.start();
// Wait for them to finish.
p.join();
c.join();
}
Upvotes: 1