Reputation: 514
After looking around stackoverflow and github issues, it seems this happened several times already in different contexts but never received any real help nor solutions so I try my luck once again. We are using a pretty standard but custom Playframework bodyparser transforming the request body as a BinaryStream
private val streamParser: BodyParser[AkkaStreams.BinaryStream] =
BodyParser { _ =>
Accumulator.source[ByteString].map(Right.apply)
}
that said, it randomly crashes while handling concurrent requests with forementioned exception. Indeed is seems a subscriber is trying to subscribe to the an already used publisher initially generated by the parser. Under the hood it is creating a akka stream Sink then a Publisher then a Source with explicitly only one subscriber possible.
def source[E]: Accumulator[E, Source[E, _]] = {
// If Akka streams ever provides Sink.source(), we should use that instead.
// https://github.com/akka/akka/issues/18406
new SinkAccumulator(
Sink
.asPublisher[E](fanout = false)
.mapMaterializedValue(publisher =>
Future.successful(Source.fromPublisher(publisher)))
)
}
Using the same code with fanout = true
leads to another error not really helpful either:
akka.stream.AbruptTerminationException: Processor actor [Actor[akka://application/system/Materializers/StreamSupervisor-0/flow-6131-1-fanoutPublisherSink#-1526957059]] terminated abruptly
Changing akka streams configuration seems to hide (as warning) or slow down the error's rate but doesn't explain why this happens:
akka {
stream {
materializer {
debug-logging = on
stage-errors-default-log-level = debug
subscription-timeout {
mode = warn
timeout = 50s
}
}
}
}
The github issue linked in the library hasn't moved for few years now. I must say that even after few days/week of investigation, it's pretty unclear why this happens, in what context and if we are running into a shared state or some kind of race condition issue. We haven't identified any repeatable way to reproduce it either. Here is the state of the Published when the exception is raised:
any tips, debug hints or help would be greatly appreciated!
Upvotes: 0
Views: 138