pvgoddijn
pvgoddijn

Reputation: 12908

Force sequential evaluation for non thread-safe Consumer in Java

With the Java8 stream API, is it somehow possible to force a Stream to be processed sequentially from a consumer?

When i have a java.util.function.Consumer of which I know it's not threadsafe I would like to force the Streams API to processes it sequentially, since running parallel would 'always' result in buggy behavior.

This snippet illustrates my problem:

static class NonThreadsafeConsumer<T> implements Consumer<T> {

    @Override
    public void accept(T arg0) {
        //Do non-threadsafe stuff here
    }
}

public void doIt(Stream<String> stream) {
    //Unknown behaviour
    stream.forEach(new NonThreadsafeConsumer<>());
    // Bug for sure
    stream.parallel().forEach(new NonThreadsafeConsumer<>());
    // Correct
    stream.sequential().forEach(new NonThreadsafeConsumer<>());
}

The problem i have with this a that I as the author of the NonThreadsafeConsumer dont want to trust the implementor of the doIt() method to always know and remember to put in a .sequential() call

(Note making the consumer threadsafe is not the point of my question, I just like to know if this CAN be done)

Upvotes: 1

Views: 381

Answers (2)

Holger
Holger

Reputation: 298499

There is no way of enforcing a single-threaded use of your Consumer, at least not without an overhead in the same order of magnitude than just making the Consumer thread safe.

But it is not the responsibility of the Consumer to enforce a single-threaded usage. In Java, not being thread safe is the norm for mutable classes, say StringBuilder, ProcessBuilder, ArrayList, HashMap, any kind of iterator, DecimalFormat, to name some examples of widely used mutable classes which are not thread safe and not enforcing a single threaded use.

Note that you could simply add a synchronized to the consumer’s accept method to enforce the execution of a single thread at a time. When being used in a sequential context, there is a chance the JVM’s optimizer eliminates the associated overhead.

But the simplest solution is to document the requirements and be done with it. If someone uses your class incorrectly, they will get the problems they asked for. You can perform validity checks on a best-effort basis, but trying to make a software bullet-proof wastes a lot of efforts for no benefit.

Upvotes: 3

Garreth Golding
Garreth Golding

Reputation: 1005

You could introduce a factory pattern which forces the user to get the Consumer from a factory instead of instantiating it directly.

The factory could take a Boolean parameter which indicates whether the stream is parallel or not.

This isn't the nicest work around but it may help the implementer of doIt() not miss out on making the Stream sequential.

Hope this helps or at least gives you some ideas of how to go about implementing this.

public static void main(String[] args) {
    List<String> strings = Arrays.asList("a", "b", "b");

    strings.stream()
            .forEach(ConsumerFactory.getConsumer(false));
}

static class ConsumerFactory {
    static Consumer getConsumer(boolean isPar) {
        if (isPar) {
            //Handle Parallel
        }

        return new NonThreadsafeConsumer<>();
    }
}

static class NonThreadsafeConsumer<T> implements Consumer<T> {
    @Override
    public void accept(T t) {
        //Do non-threadsafe stuff here
    }
}

Upvotes: 0

Related Questions