Sepfins
Sepfins

Reputation: 312

Exception propagation in Java parallel streams

In Akka in Action book it says that

Exceptions are almost impossible to share between threads out of the box, unless you are prepared to build a lot of infrastructure to handle this.

and, as far as I understand, if an exception occurs in a parallel thread it will be propagated to the caller. If this mechanism is possible, why isn't it implemented with regular threads? Am I missing something?

Edit: I am talking about possibility of something like this:

public static void count() {
    long count = 0;
    try {
        count = IntStream.range(1, 10)
                .parallel()
                .filter(number -> f(number)).count();
    } catch(RuntimeException e) {
        /* handle */
    }
    System.out.println("Count - " + count);
}

public static boolean f(final int number) {
    if(Math.random() < 0.1) {
        throw new RuntimeException();
    }
    return true;
}

parallel() spawns multiple threads and when a RuntimeException is thrown on any of them, that exception is still caught on main thread, which seems to counter that books point.

Edit 2:

Example with exception unable to propagate outside of a thread

Upvotes: 1

Views: 818

Answers (1)

daniu
daniu

Reputation: 15008

The main difference is that while the individual Stream intermediates can run in parallel, they are only evaluated when the terminal operation is encountered; that makes it a virtual join point.

Ie, the same would be possible with something like

try {
    Thread concurrent = new Thread(runnable);
    concurrent.start();
    concurrent.join();
} catch (ExceptionThrownInThread ex) {}

However, in the general case - and that's pretty much Akka's programming model - you have

yourMessenger.registerCallbacks(callbacks);
new Thread(yourMessenger).start(); 

Now, the callbacks will eventually be called from within the thread you created, but there is no structure to wrap around its execution as a whole; so who would catch this exception?

I don't know Akka enough, but in projectreactor's Publishers, you can register an error handler, as in

Mono<Result> mono = somethread.createResult().onError(errorHandler);

But again, in the general case it's not trivial.

Upvotes: 1

Related Questions