TalBeno
TalBeno

Reputation: 289

Akka HTTP Stream listener stops processing databytes after a while

I have an app which has 3 HTTP listeners like this one:

val futureResponse1: Future[HttpResponse] =
  Http().singleRequest(HttpRequest(uri = someUrl))

each of the 3 is listening to a non stop stream (each to a different one). And handles it with a simple flow that starts with grouping and then a relatively fast processing (non-blocking):

futureResponse1.flatMap {response =>
  response.status match {
    case StatusCodes.OK =>
      val source: Source[ByteString, Any] = response.entity.dataBytes
      source.
        grouped(100).                
        map(doSomethingFast).                               
        runWith(Sink.ignore)                        

    case notOK => system.log.info("failed opening, status: " + notOK.toString())
  }

...

I get no exceptions or warnings. But after a while (could be 15-25 minutes) the listeners are just suddenly stopping. One after the other (not together).

Maybe its the grouped phase that is the problem there? Or maybe the connection/stream just stops? Or the dispatcher that is shared by them is getting starved / something not getting released.

Any ideas why that may be happening please?

==== update ====

@Ramon J Romero y Vigil I changed my run to have only 1 stream instead of 3, and I removed the grouped stage. Still happening after few minutes. I suspect that the stream is closing based on timeout. all I do is get chunks and consume them.

==== update ====

found the reason, see below.

Upvotes: 1

Views: 279

Answers (1)

TalBeno
TalBeno

Reputation: 289

that was the reason:

EntityStreamSizeException: actual entity size (None) exceeded content length limit (8388608 bytes)! You can configure this by setting akka.http.[server|client].parsing.max-content-length or calling HttpEntity.withSizeLimit before materializing the dataBytes stream.

For anyone seeking the solution in the case of continuous response stream, you can get the source this way, using withoutSizeLimit:

val source: Source[ByteString, Any] = response.entity.withoutSizeLimit().dataBytes

Upvotes: 1

Related Questions