Reputation: 70017
I have the following (simplified) consumer that is supposed to cancel an Akka-Stream (2.11_2.5-SNAPSHOT of today) Source
after the first item but the onNext
is still getting called 4 times:
static Subscriber<Object> println() {
return new Subscriber<Object>() {
Subscription s;
int n;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(5);
}
@Override
public void onNext(Object t) {
System.out.println(Thread.currentThread().getName()
+ ": " + t + " - " + (++n));
if (s != null) {
s.cancel();
s = null;
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + ": DONE");
}
};
}
public static void main(String[] args) throws Exception {
Config cfg = ConfigFactory.parseResources(
AkkaRange.class, "/akka-streams.conf").resolve();
ActorSystem actorSystem = ActorSystem.create("sys", cfg);
ActorMaterializer materializer = ActorMaterializer.create(actorSystem);
Source<Integer, NotUsed> source = Source.repeat(1);
Publisher<Integer> p = source.runWith(Sink.asPublisher(
AsPublisher.WITH_FANOUT), materializer);
p.subscribe(println());
Thread.sleep(1000);
actorSystem.terminate();
}
Given that request is 5 yet only 4 calls are made, I assume the underlying messaging architecture responds to requests in 4-batches before checking the message queue for a cancellation (or further request) messages.
Is there a setting to make the cancellation happen more eagerly?
The use case is something like an interop computation where there is a computationally intensive stage (map) that may produce a desired result after 1-2 source elements and the downstream cancels the stream in this case. The problem is that due to this 4 batch, the computation is executed for the remaining 2-3 elements too.
Upvotes: 0
Views: 409
Reputation: 128111
The Subscriber
interface is a part of the Reactive Streams specification, which lots of libraries, akka-streams included, implement. And this specification states the following thing:
A Subscriber MUST be prepared to receive one or more onNext signals after having called Subscription.cancel() if there are still requested elements pending [see 3.12]. Subscription.cancel() does not guarantee to perform the underlying cleaning operations immediately.
Therefore you have to handle this situation in your subscriber manually, otherwise it would be violating the specification and thus unfit to be used with the libraries implementing the spec.
Upvotes: 0