asinduvg
asinduvg

Reputation: 67

How to consume HttpResponse in Akka Http

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri}
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import spray.json._

import java.util.UUID
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.{Failure, Success}

object SoftwareRegistry extends App with Formatter {

  implicit val system = ActorSystem("NPMRegistry")
  implicit val materializer = ActorMaterializer()

  case class NPMPackage(name: String)

  // reading the packages
  val filename = "B:\\Scala\\NPMRegistry\\src\\main\\resources\\packages.txt"
  val bufferedSource = scala.io.Source.fromFile(filename)
  val listOfPackages: List[NPMPackage] = (for (line <- bufferedSource.getLines) yield {
    NPMPackage(line.trim)
  }).toList
  bufferedSource.close()

  // requests
  val serverHttpRequests = listOfPackages.map(pkg =>
    (HttpRequest(
      HttpMethods.GET,
      uri = Uri(s"/${pkg.name}")
    ),
      UUID.randomUUID().toString)
  )

  // source
  val sourceList = Source(serverHttpRequests)
  val bufferedFlow = Flow[(HttpRequest, String)]
    .buffer(10, overflowStrategy = OverflowStrategy.backpressure)
    .throttle(1, 3 seconds)

  val dd = sourceList
    .via(bufferedFlow).async
    .via(Http().cachedHostConnectionPoolHttps[String]("registry.npmjs.org"))
    .runForeach {
      case (Success(response), oId) =>
        println(s"$oId $response")
      case (Failure(ex), oId) => println(ex)
    }

In the above code, I can print the response to the console and I want to know how to consume entity and access the data from the response in a streamed way, not in a future.

Following is the result of the existing code enter image description here

Upvotes: 1

Views: 329

Answers (1)

artur
artur

Reputation: 1760

You basically need to keep you logic within Akka Streams API and not terminating it with runForEach like you did.

A simplified example of this can look like this:

.via(Http().cachedHostConnectionPoolHttps[String]("registry.npmjs.org"))
    .flatMapConcat {
      case (Success(response), _) => Source.single(response)
      case (Failure(_), _) => Source.empty //warning, ignoring errors
    }
    .map(httpResponse => httpResponse.entity)
    .flatMapConcat(e => e.getDataBytes().map(bytes => ???))
    .runWith(Sink.ignore)

Instead of runforEach I am flatMapConcatenating to get the HttpRespnse ignoring errors and the context of the request for simplicity of the example. Then mapping to get HttpEntity and then flatMapConcatenating again to get the ByteStrings representing the response body. There could be multiple of those coming form every HttpRequest and that's, what I am guessing you're referring to by "streamed way".

Upvotes: 1

Related Questions