chqdrian
chqdrian

Reputation: 345

scala http requests using parallel collections

i'm experimenting with scala parallel collections. I'm trying to get data from a local server, that I've set up, this is my code

val httpRequestInputs = List(inputs).par

def getResponse(data: String, url: String) = {
    val request = basicRequest.body(text).post(url)
      .headers(Map("content-type" -> "text/plain", "charset" -> "utf-8"))
      .response(asString)

    implicit val backend
    = HttpURLConnectionBackend(options = SttpBackendOptions.connectionTimeout(5.minutes))
    request.readTimeout(5.minutes).send().body
}



// program executes from here
  httpRequestInputs.foreach { input =>
      val result = getResponse(input, url)
      result match {
          case Right(value) => println(value)
          case Left(value) => println("error")
     }

when using inputs of small size, there is no problem, but, when I try with large input size, the program throws SocketException, and I checked the server, the server has no errors, it seems to me, that the client is closing the connection early. And, these large inputs, usually take less than 90 seconds to get response, when run individually.

I tried to extend the connection and read timeout options in the http request, but still I'm getting the exception.

can anyone help me understand, why the client is closing the connection?

for http request, i'm using the client com.softwaremill.sttp.client

Upvotes: 0

Views: 1249

Answers (1)

Levi Ramsey
Levi Ramsey

Reputation: 20551

If "large input size" means at least several thousand inputs and each of these is connecting to the same remote server, then it's likely that you're exhausting the ephemeral port range where you're running this: essentially there's a limit (which varies from OS to OS) on the number of connections you can make to the same remote host and port within a certain length of time (Windows documentation, but every OS to my knowledge has similar limits).

You'll either need to catch the exception and retry, or throttle the connection attempts so that you don't exhaust the range. (In rare cases, if you're not attempting much more than the limit, there's probably an OS configuration option which lets you increase the limit).

A good way to throttle this using the Scala standard library is to use Future:

import scala.concurrent.{ ExecutionContext, Future }

implicit val executionContext = ExecutionContext.fromExecutor(
  new java.util.concurrent.ForkJoinPool(1000) // Allow 1000 requests to be executing at once
)

val allRequestsFut =
  Future.sequence(
    httpRequestInputs.map { input =>
      Future { getResponse(input, url) }.map {
        _ match {
          case Right(value) => println(value)
          case Left(err) => println(s"error: $err")
        }
      }
    )

allRequestsFut.foreach { _ =>
  println("all requests complete")
}

Note that many OS's (Linux included) will continue to reserve ephemeral ports for some time after the port has been closed. To throttle requests dynamically, I would suggest using something like Akka Streams.

Upvotes: 3

Related Questions