Reputation: 762
I'm using Akka-Stream 1.0 with a simple reactive stream:
with
override val requestStrategy = new MaxInFlightRequestStrategy(max = 20) {
override def inFlightInternally: Int = messageBacklog.size
The publisher will close the stream after N messages (dynamically) via sending an OnComplete
message.
The subscriber receives the messages and goes into canceled
state right away. The problem is, that the subscriber needs some time to process each messages meaning that I usually have some backlog of messages - which can't be processed anymore as the subscriber gets canceled
- IMHO in ActorSubscriber.scala:195
Processing a message means that my Subscriber will offload the work to someone else (Sending content back via Spray's ChunkedMessage
s) and gets a ack message back as soon a message is completed. As the Actor is canceled, the ack message is never processed and the backlog processed.
What is recommended to let me complete the backlog?
I could 'invent' my own 'Done Marker' but that sounds very strange to me. Obviously my code works with MaxInFlightRequestStrategy
and a max of 1 - as there the demand will be always only 1 - meaning I never have a backlog of messages.
Upvotes: 1
Views: 353
Reputation: 762
After long hours of debugging and trying around I think I understand what was/is going on - hopefully it saves other peoples time:
I think I failed with a conceptual misunderstanding on how to implement an reactive subscriber:
I was spooling messages internally of an ActorSubscriber
and released those spooled messages at the right time back to the business logic via self ! SpooledMessage
- which caused the calculations of the Subscriber to go crazy: Each spooled messages was counted twice as 'received' causing the internals to ask for even more messages from upstream.
Fixing this by processing the spooled messages within the actor itself resolved that problem - allowing me also to use OnComplete
properly: As soon as this messages is received, the Subscriber does not get any new messages but I process the internal queue on its own (without using self ! ...
) and thus complete the whole stream processing.
Upvotes: 0