Reputation: 289
I have an app which has 3 HTTP listeners like this one:
val futureResponse1: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = someUrl))
each of the 3 is listening to a non stop stream (each to a different one). And handles it with a simple flow that starts with grouping and then a relatively fast processing (non-blocking):
futureResponse1.flatMap {response =>
response.status match {
case StatusCodes.OK =>
val source: Source[ByteString, Any] = response.entity.dataBytes
source.
grouped(100).
map(doSomethingFast).
runWith(Sink.ignore)
case notOK => system.log.info("failed opening, status: " + notOK.toString())
}
...
I get no exceptions or warnings. But after a while (could be 15-25 minutes) the listeners are just suddenly stopping. One after the other (not together).
Maybe its the grouped phase that is the problem there? Or maybe the connection/stream just stops? Or the dispatcher that is shared by them is getting starved / something not getting released.
Any ideas why that may be happening please?
==== update ====
@Ramon J Romero y Vigil I changed my run to have only 1 stream instead of 3, and I removed the grouped stage. Still happening after few minutes. I suspect that the stream is closing based on timeout. all I do is get chunks and consume them.
==== update ====
found the reason, see below.
Upvotes: 1
Views: 279
Reputation: 289
that was the reason:
EntityStreamSizeException: actual entity size (None) exceeded content length limit (8388608 bytes)! You can configure this by setting akka.http.[server|client].parsing.max-content-length or calling HttpEntity.withSizeLimit before materializing the dataBytes stream.
For anyone seeking the solution in the case of continuous response stream, you can get the source this way, using withoutSizeLimit:
val source: Source[ByteString, Any] = response.entity.withoutSizeLimit().dataBytes
Upvotes: 1