Reputation: 12908
With the Java8 stream API, is it somehow possible to force a Stream to be processed sequentially from a consumer?
When i have a java.util.function.Consumer
of which I know it's not threadsafe I would like to force the Streams API to processes it sequentially, since running parallel would 'always' result in buggy behavior.
This snippet illustrates my problem:
static class NonThreadsafeConsumer<T> implements Consumer<T> {
@Override
public void accept(T arg0) {
//Do non-threadsafe stuff here
}
}
public void doIt(Stream<String> stream) {
//Unknown behaviour
stream.forEach(new NonThreadsafeConsumer<>());
// Bug for sure
stream.parallel().forEach(new NonThreadsafeConsumer<>());
// Correct
stream.sequential().forEach(new NonThreadsafeConsumer<>());
}
The problem i have with this a that I as the author of the NonThreadsafeConsumer
dont want to trust the implementor of the doIt()
method to always know and remember to put in a .sequential()
call
(Note making the consumer threadsafe is not the point of my question, I just like to know if this CAN be done)
Upvotes: 1
Views: 381
Reputation: 298499
There is no way of enforcing a single-threaded use of your Consumer
, at least not without an overhead in the same order of magnitude than just making the Consumer
thread safe.
But it is not the responsibility of the Consumer
to enforce a single-threaded usage. In Java, not being thread safe is the norm for mutable classes, say StringBuilder
, ProcessBuilder
, ArrayList
, HashMap
, any kind of iterator, DecimalFormat
, to name some examples of widely used mutable classes which are not thread safe and not enforcing a single threaded use.
Note that you could simply add a synchronized
to the consumer’s accept
method to enforce the execution of a single thread at a time. When being used in a sequential context, there is a chance the JVM’s optimizer eliminates the associated overhead.
But the simplest solution is to document the requirements and be done with it. If someone uses your class incorrectly, they will get the problems they asked for. You can perform validity checks on a best-effort basis, but trying to make a software bullet-proof wastes a lot of efforts for no benefit.
Upvotes: 3
Reputation: 1005
You could introduce a factory pattern which forces the user to get the Consumer from a factory instead of instantiating it directly.
The factory could take a Boolean parameter which indicates whether the stream is parallel or not.
This isn't the nicest work around but it may help the implementer of doIt()
not miss out on making the Stream sequential.
Hope this helps or at least gives you some ideas of how to go about implementing this.
public static void main(String[] args) {
List<String> strings = Arrays.asList("a", "b", "b");
strings.stream()
.forEach(ConsumerFactory.getConsumer(false));
}
static class ConsumerFactory {
static Consumer getConsumer(boolean isPar) {
if (isPar) {
//Handle Parallel
}
return new NonThreadsafeConsumer<>();
}
}
static class NonThreadsafeConsumer<T> implements Consumer<T> {
@Override
public void accept(T t) {
//Do non-threadsafe stuff here
}
}
Upvotes: 0