Reputation: 183
I've got an issue with akka-http. I'm trying to request several times in a flow but it stops with default configuration at 4 times. Here is the code I use. Can someone help me understanding why it waits?
Thanks a lot
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.{HostConnectionPool, OutgoingConnection}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream._
import akka.stream.scaladsl._
import com.typesafe.scalalogging.LazyLogging
import scala.concurrent.Future
import scala.util.{Try, Failure, Success}
object TestHttp extends LazyLogging {
def main(args: Array[String]) {
implicit val system = ActorSystem()
import system.dispatcher
val decider: Supervision.Decider = {
case e => {
logger.error(e.getMessage, e)
Supervision.Resume
}
}
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider).withDebugLogging(true))
val resultSink: Sink[(Try[HttpResponse], Int), Future[Unit]] = Sink.foreach { case (hr, i) => {
hr match {
case Success(r) => logger.info(s"Success ${r} for ${i}")
case Failure(e) => logger.error(s"Error ${e} for ${i}")
}
}
}
val source: Source[Int, Unit] = Source(0 to 1500).map(i => {
logger.info(s"${i} iteration")
i
})
val buildHr: Flow[Int, (HttpRequest, Int), Unit] = Flow[Int].map { case s => {
(HttpRequest(uri = "/").withDefaultHeaders(), s)
}
}
val connection: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), HostConnectionPool] = Http().cachedHostConnectionPool("www.adajam.uk.com")
import FlowGraph.Implicits._
source.via(buildHr).via(connection).to(resultSink).run()
}
}
Upvotes: 3
Views: 300
Reputation: 183
In order to release the connection, the body has to be read. This can be done with
r.entity.dataBytes.to(Sink.ignore)
Upvotes: 3