findusl
findusl

Reputation: 2654

Catch exception from parallel stream

I have a bunch of columns as string arrays from a csv file. Now I want to parse them. Since this parsing requires date parsing and other not so fast parsing techniques I was thinking about parallelism (I timed it, it takes some time). My simple approach:

Stream.of(columns).parallel().forEach(column -> 
    result[column.index] = parseColumn(valueCache[column.index], column.type));

Columns contains ColumnDescriptor elements which simply has two attributes, the column index to be parsed and the type which defines how to parse it. Nothing else. result is an Object array which takes the resulting arrays.

The problem is now that the parse function throws a ParseException, that I handle further up the call stack. Since we are in parallel here it can't just be thrown. What is the best way to handle this?

I have this solution, but I kind of cringe reading it. What would be a better way to do it?

final CompletableFuture<ParseException> thrownException = new CompletableFuture<>();
Stream.of(columns).parallel().forEach(column -> {
    try {
        result[column.index] = parseColumn(valueCache[column.index], column.type);
    } catch (ParseException e) {
        thrownException.complete(e);
    }});

if(thrownException.isDone())
    //only can be done if there is a value set.
    throw thrownException.getNow(null);

Notes: I do not need all the exceptions. If I parse them sequentially I will also only get one anyway. So that is ok.

Upvotes: 12

Views: 15761

Answers (2)

Holger
Holger

Reputation: 298469

The problem is your wrong premise “Since we are in parallel here it can't just be thrown.” There is no specification forbidding throwing exceptions in parallel processing. You can just throw that exception in a parallel stream the same way you do in a sequential stream, wrapping it in an unchecked exception, if it is a checked exception.

If there is at least one exception thrown in a thread, the forEach invocation will propagate it (or one of them) to the caller.

The only issue you might encounter, is, that the current implementation doesn’t wait for the completion of all threads when it encounters an exception. This can be worked around using

try {
    Arrays.stream(columns).parallel()
        .forEach(column -> 
            result[column.index] = parseColumn(valueCache[column.index], column.type));
} catch(Throwable t) {
    ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.MINUTES);
    throw t;
}

But usually, you don’t need it as you won’t access the concurrently processed result in the exceptional case.

Upvotes: 9

john16384
john16384

Reputation: 8064

I think the question is more, what do you normally do when parsing it serially?

Do you stop at the first exception, and stop the entire process? In that case, wrap the exception in a run time exception, and let the stream abort and throw it. Catch the wrapper exception, unwrap it and deal with it.

Do you skip the bad records? Then either 1. keep track of the errors in a List somewhere or 2. create a wrapper object that can hold either a parsed result or an error (don't track the exceptions themselves, only the minimum needed to describe the error).

Check afterwards if there were errors in the list for the first option, or display the records that had errors differently for the second option.

Upvotes: 2

Related Questions