Bill'o
Bill'o

Reputation: 514

java.lang.IllegalStateException Sink.asPublisher(fanout = false) only supports one subscriber when using custom Playframework body parser as stream

After looking around stackoverflow and github issues, it seems this happened several times already in different contexts but never received any real help nor solutions so I try my luck once again. We are using a pretty standard but custom Playframework bodyparser transforming the request body as a BinaryStream

private val streamParser: BodyParser[AkkaStreams.BinaryStream] =
  BodyParser { _ =>
    Accumulator.source[ByteString].map(Right.apply)
  }

that said, it randomly crashes while handling concurrent requests with forementioned exception. Indeed is seems a subscriber is trying to subscribe to the an already used publisher initially generated by the parser. Under the hood it is creating a akka stream Sink then a Publisher then a Source with explicitly only one subscriber possible.

def source[E]: Accumulator[E, Source[E, _]] = {
// If Akka streams ever provides Sink.source(), we should use that instead.
// https://github.com/akka/akka/issues/18406
    new SinkAccumulator(
      Sink
        .asPublisher[E](fanout = false)
        .mapMaterializedValue(publisher => 
      Future.successful(Source.fromPublisher(publisher)))
    )
  }

Using the same code with fanout = true leads to another error not really helpful either: akka.stream.AbruptTerminationException: Processor actor [Actor[akka://application/system/Materializers/StreamSupervisor-0/flow-6131-1-fanoutPublisherSink#-1526957059]] terminated abruptly

Changing akka streams configuration seems to hide (as warning) or slow down the error's rate but doesn't explain why this happens:

akka {
  stream {
    materializer {
    debug-logging = on
    stage-errors-default-log-level = debug
    subscription-timeout {
      mode = warn
      timeout = 50s
    }
  }
}

}

The github issue linked in the library hasn't moved for few years now. I must say that even after few days/week of investigation, it's pretty unclear why this happens, in what context and if we are running into a shared state or some kind of race condition issue. We haven't identified any repeatable way to reproduce it either. Here is the state of the Published when the exception is raised:enter image description here

any tips, debug hints or help would be greatly appreciated!

Upvotes: 0

Views: 138

Answers (0)

Related Questions