Reputation: 67
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri}
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import spray.json._
import java.util.UUID
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.{Failure, Success}
object SoftwareRegistry extends App with Formatter {
implicit val system = ActorSystem("NPMRegistry")
implicit val materializer = ActorMaterializer()
case class NPMPackage(name: String)
// reading the packages
val filename = "B:\\Scala\\NPMRegistry\\src\\main\\resources\\packages.txt"
val bufferedSource = scala.io.Source.fromFile(filename)
val listOfPackages: List[NPMPackage] = (for (line <- bufferedSource.getLines) yield {
NPMPackage(line.trim)
}).toList
bufferedSource.close()
// requests
val serverHttpRequests = listOfPackages.map(pkg =>
(HttpRequest(
HttpMethods.GET,
uri = Uri(s"/${pkg.name}")
),
UUID.randomUUID().toString)
)
// source
val sourceList = Source(serverHttpRequests)
val bufferedFlow = Flow[(HttpRequest, String)]
.buffer(10, overflowStrategy = OverflowStrategy.backpressure)
.throttle(1, 3 seconds)
val dd = sourceList
.via(bufferedFlow).async
.via(Http().cachedHostConnectionPoolHttps[String]("registry.npmjs.org"))
.runForeach {
case (Success(response), oId) =>
println(s"$oId $response")
case (Failure(ex), oId) => println(ex)
}
In the above code, I can print the response to the console and I want to know how to consume entity and access the data from the response in a streamed way, not in a future.
Following is the result of the existing code
Upvotes: 1
Views: 329
Reputation: 1760
You basically need to keep you logic within Akka Streams API and not terminating it with runForEach
like you did.
A simplified example of this can look like this:
.via(Http().cachedHostConnectionPoolHttps[String]("registry.npmjs.org"))
.flatMapConcat {
case (Success(response), _) => Source.single(response)
case (Failure(_), _) => Source.empty //warning, ignoring errors
}
.map(httpResponse => httpResponse.entity)
.flatMapConcat(e => e.getDataBytes().map(bytes => ???))
.runWith(Sink.ignore)
Instead of runforEach
I am flatMapConcat
enating to get the HttpRespnse
ignoring errors and the context of the request for simplicity of the example. Then map
ping to get HttpEntity
and then flatMapConcat
enating again to get the ByteString
s representing the response body. There could be multiple of those coming form every HttpRequest
and that's, what I am guessing you're referring to by "streamed way".
Upvotes: 1