Reputation: 121
I'm trying to do implement paging using Akka Streams. Currently I have
case class SomeObject(id:Long, next_page:Option[Map[String,String]])
def chainRequests(uri: Uri): Future[Option[(Uri, T)]] = {
if (uri.isEmpty) return Future.successful(None)
val response: Future[Response[T]] = sendWithRetry(prepareRequest(HttpMethods.GET, uri)).flatMap(unmarshal)
response.map { resp =>
resp.next_page match {
case Some(next_page) => Some(next_page("uri"), resp.data)
case _ => Some(Uri.Empty, resp.data)
}
}
}
Source.single(SomeObject).map(Uri(s"object/${_.id}")).map(uri => Source.unfoldAsync(url)(chainRequest)).map(...some processing goes here)
The problem is that if I do source.take(1000) and paging has a lot of elements(pages) than downstream does not gets new elements until Source.unfoldAsync finishes.
I was trying to use cycles in Flows like
val in = builder.add(Flow[Uri])
val out = builder.add[Flow[T]]
val partition = b.add(Partition[Response[T]](2,r => r.next_page match {case Some(_)=>1; case None => 0}))
val merge = b.add(Merge[Response[T]],2)
in ~> mergeUri ~> sendRequest ~> partition
mergeUri.preferred <~ extractNextUri <~ partition.out(1)
partition.out(0) ~> Flow[Response[T]].map(_.data) ~> out
FlowShape(in.in, out.out)
But above code does not work.
I'm stuck with creating my own GraphStage. UnfoldAsync takes first element, but with Flow solution I don't have "first" element. Any suggestions?
Thanks
Upvotes: 1
Views: 416
Reputation: 3378
The Source.unfoldAsync
is what you are looking for.
I've prepared a simple project that traverse all pages of REST API and accumulates results from all pages, returning Future with Seq.
The complete source code and project that you can run can be found over on GitHub
class CatsHttpClientImpl(implicit system: ActorSystem[_], ec: ExecutionContext) extends CatsHttpClient {
private val logger: Logger = LoggerFactory.getLogger(classOf[CatsHttpClientImpl])
private val start: Option[String] = Some("https://catfact.ninja/breeds")
override def getAllBreads: Future[Seq[Cat]] = {
Source
.unfoldAsync(start) {
case Some(next) =>
val nextChunkFuture: Future[CatsResponse] = sendRequest(next)
nextChunkFuture.map { resp =>
resp.nextPageUrl match {
case Some(url) => Some((Some(url), resp.data))
case None => Some((None, resp.data))
}
}
case None => Future.successful(None)
}
.runWith(Sink.fold(Seq(): Seq[Cat])(_ ++ _))
}
private def sendRequest(url: String): Future[CatsResponse] = {
logger.info(s"CatsHttpClientImpl: Sending request $url")
val request = HttpRequest(
uri = Uri(url),
headers = List(
RawHeader("Accept", "application/json")
)
)
Http(system).singleRequest(request).flatMap { response =>
response.status match {
case StatusCodes.OK =>
logger.info("CatsHttpClientImpl: Received success")
Unmarshal(response.entity).to[CatsResponse]
case _ =>
logger.error("CatsHttpClientImpl: Received error")
throw new CatsHttpClientException()
}
}
}
}
Upvotes: 0
Reputation: 121
Found the solution with writing my own GraphStage
final class PaginationGraphStage[S <: Uri, E](f: S => Future[Option[(S, E)]])(implicit ec: ExecutionContextExecutor)
extends GraphStage[FlowShape[S, E]]{
val in: Inlet[S] = Inlet[S]("PaginationGraphStage.in")
val out: Outlet[E] = Outlet[E]("PaginationGraphStage.out")
override val shape: FlowShape[S, E] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
private[this] var state: S = _
private[this] var inFlight = 0
private[this] var asyncFinished = false
private[this] def todo: Int = inFlight
def futureCompleted(result: Try[Option[(Uri, E)]]): Unit = {
inFlight -= 1
result match {
case Failure(ex) => fail(out, ex)
case Success(None) =>
asyncFinished = true
complete(out)
case Success(Some((newS: S, elem: E))) if !newS.isEmpty =>
push(out, elem)
state = newS
case Success(Some((newS: Uri, elem: E))) =>
push(out, elem)
asyncFinished = true
if (isAvailable(in)) getHandler(in).onPush()
else completeStage()
}
}
private val futureCB = getAsyncCallback(futureCompleted)
private val invokeFutureCB: Try[Option[(S, E)]] => Unit = futureCB.invoke
private def pullIfNeeded(): Unit = {
if (!hasBeenPulled(in)) tryPull(in)
}
override def onUpstreamFinish(): Unit = {
if (todo == 0) completeStage()
}
def onPull(): Unit = {
if (state != null) {
asyncFinished = false
inFlight += 1
val future = f(state)
future.value match {
case None => future.onComplete(invokeFutureCB)
case Some(v) => futureCompleted(v)
}
} else {
pullIfNeeded()
}
}
override def onPush(): Unit = {
if (state == null) {
inFlight += 1
state = grab(in)
pullIfNeeded()
getHandler(out).onPull()
}
if (asyncFinished) {
inFlight += 1
state = grab(in)
pullIfNeeded()
}
}
setHandlers(in, out, this)
}
}
Upvotes: 1