Reputation: 24518
I was playing with the ForkJoin
framework and Java 8 accumulateAndGet
function. The following program produced an output that I couldn't explain. Can you?
P.S: This is not a homework. This is an exercise from the book "Java SE 8 for the Really Impatient" by Cay S. Horstmann. It's a good book.
/**
* Q1: Write a program that keeps track of the longest string that is observed by a number of threads. Use an
* {@code AtomicReference} and an appropriate accumulator.
*
* @param observed
* Longest string.
* @param x
* String value to be compared to the longest value.
* @return Longest string.
*/
public static String updateLongestString(final AtomicReference<String> observed, final String x) {
LOGGER.info("Received observed: {}, x: {}", observed, x);
final String longestString = observed.accumulateAndGet(x,
maxBy((str1, str2) -> observed.get().length() - x.length()));
LOGGER.info("New observed: {}.", longestString);
return longestString;
}
Unit test:
@Test
public void testUpdateLongestString() {
final String[] words = new String[] { "Java 8", "Java 8 is Awesome!",
"Java 8 is the Best thing Since Sliced Bread!", "Java 8 Changes Everything!" };
final int len = words.length;
final int stopAfter = 100;
final AtomicReference<String> longestString = new AtomicReference<>(words[0]);
final AtomicInteger count = new AtomicInteger(1);
class UpdateLongestStringTask extends RecursiveAction {
private static final long serialVersionUID = -2288401002001447054L;
private int id = -1;
private UpdateLongestStringTask(final int id) {
this.id = id;
}
@Override
protected void compute() {
LOGGER.info("Executing task #: {}.", id);
if (count.get() >= stopAfter) {
return;
}
final ForkJoinTask<Void> task = new UpdateLongestStringTask(count.incrementAndGet()).fork();
updateLongestString(longestString, words[randomIndex()]);
task.join();
}
private int randomIndex() {
return ThreadLocalRandom.current().nextInt(len);
}
}
/* Just because we can. */
final int parallelism = min(getRuntime().availableProcessors(), 4);
new ForkJoinPool(parallelism).invoke(new UpdateLongestStringTask(count.get()));
}
Output (line marked -->> can't be explained; how come it's printing a value it never apparently received):
2015-01-05 23:20:00.974 [ForkJoinPool-1-worker-1] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 1.
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-2] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 2.
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-3] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 3.
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-0] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 4.
2015-01-05 23:20:00.981 [ForkJoinPool-1-worker-0] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8 Changes Everything!
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-3] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-1] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8 is Awesome!
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-2] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8 is the Best thing Since Sliced Bread!
2015-01-05 23:20:01.028 [ForkJoinPool-1-worker-0] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - New observed: Java 8 Changes Everything!.
-->> 2015-01-05 23:20:01.028 [ForkJoinPool-1-worker-3] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - New observed: Java 8 Changes Everything!.
2015-01-05 23:20:01.028 [ForkJoinPool-1-worker-1] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - New observed: Java 8 is Awesome!.
Upvotes: 4
Views: 362
Reputation: 298143
If you provide a function, that function should operate on its arguments, i.e.
final String longestString = observed.accumulateAndGet(x,
maxBy((str1, str2) -> observed.get().length() - x.length()));
is heavily violating the contract of the functional API. accumulateAndGet
will provide an atomic update regarding the specified operation but not regarding another get
operation within the function.
It’s not clear why you did this as implementing the correct function is straight-forward:
final String longestString = observed.accumulateAndGet(x,
maxBy((str1, str2) -> str1.length() - str2.length()));
or
final String longestString =
observed.accumulateAndGet(x, maxBy(comparingInt(String::length)));
Note that after fixing the code you still might observe a result different than the two values you have logged before as accumulateAndGet
provides you an atomic update but this atomicity does not expand to logging operations you performed before invoking accumulateAndGet
. The logged content of the AtomicReference
might be outdated at the time when the atomic update is performed. But due to the contract of the update and your provided operator that resulting String
will have at least the same size as the maximum of your previously seen values.
Further, keep in mind that the perceived order of log operations doesn’t say anything about the actual order of performed update operations.
You may get a better view of what happens when changing the code as follows:
public static String updateLongestString(AtomicReference<String> observed, String x) {
final String longestString = observed.accumulateAndGet(x, maxBy((str1, str2) -> {
LOGGER.info("Received str1: {}, str2: {}", str1, str2);
return str1.length() - str2.length();
}));
LOGGER.info("New observed: {}.", longestString);
return longestString;
}
Upvotes: 2