Reputation: 1202
I am trying to download a file from S3 using the following code:
wsClient
.url(url)
.withMethod("GET")
.withHttpHeaders(my_headers: _*)
.withRequestTimeout(timeout)
.stream()
.map {
case AhcWSResponse(underlying) =>
underlying.bodyAsBytes
}
When I run this I get the following exception:
akka.stream.StreamLimitReachedException: limit of 13 reached
Is this because I am using bodyAsBytes
? What does this error mean ? I also see this warning message which is probably related:
blockingToByteString is a blocking and unsafe operation!
Upvotes: 0
Views: 330
Reputation: 927
This happens because if you use stream()
, you need to consume the source using bodyAsSource
. It is important to do so or it would otherwise backpressure the connection. body
or bodyAsBytes
are implemented and do consume the source but for some reason the implementor decided to let you know that you should have used execute()
instead of stream()
by limiting the body to 13 ByteStrings and 50ms timeout.
Upvotes: 1
Reputation: 1371
You are getting StreamLimitReachedExpcetion because the number of incoming elements is larger than max.
val MAX_ALLOWED_SIZE = 100
// OK. Future will fail with a `StreamLimitReachedException`
// if the number of incoming elements is larger than max
val limited: Future[Seq[String]] =
mySource.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq)
// OK. Collect up until max-th elements only, then cancel upstream
val ignoreOverflow: Future[Seq[String]] =
mySource.take(MAX_ALLOWED_SIZE).runWith(Sink.seq)
You can find more information about streaming process here
Upvotes: 0