Maxim Dvoryanchenko
Maxim Dvoryanchenko

Reputation: 121

Akka Streams recursive flow call

I'm trying to do implement paging using Akka Streams. Currently I have

case class SomeObject(id:Long, next_page:Option[Map[String,String]])
def chainRequests(uri: Uri): Future[Option[(Uri, T)]] = {
    if (uri.isEmpty) return Future.successful(None)
    val response: Future[Response[T]] = sendWithRetry(prepareRequest(HttpMethods.GET, uri)).flatMap(unmarshal)
    response.map { resp =>
      resp.next_page match {
        case Some(next_page) => Some(next_page("uri"), resp.data)
        case _ => Some(Uri.Empty, resp.data)
      }
    }
  }
Source.single(SomeObject).map(Uri(s"object/${_.id}")).map(uri => Source.unfoldAsync(url)(chainRequest)).map(...some processing goes here)

The problem is that if I do source.take(1000) and paging has a lot of elements(pages) than downstream does not gets new elements until Source.unfoldAsync finishes.

I was trying to use cycles in Flows like

val in = builder.add(Flow[Uri])
val out = builder.add[Flow[T]]

val partition = b.add(Partition[Response[T]](2,r => r.next_page match {case Some(_)=>1; case None => 0}))
val merge = b.add(Merge[Response[T]],2)
in ~> mergeUri ~> sendRequest ~> partition
      mergeUri.preferred <~ extractNextUri <~ partition.out(1)
      partition.out(0) ~> Flow[Response[T]].map(_.data) ~> out
FlowShape(in.in, out.out)

But above code does not work.

I'm stuck with creating my own GraphStage. UnfoldAsync takes first element, but with Flow solution I don't have "first" element. Any suggestions?

Thanks

Upvotes: 1

Views: 416

Answers (2)

Eugene Maysyuk
Eugene Maysyuk

Reputation: 3378

The Source.unfoldAsync is what you are looking for.

I've prepared a simple project that traverse all pages of REST API and accumulates results from all pages, returning Future with Seq.

The complete source code and project that you can run can be found over on GitHub

class CatsHttpClientImpl(implicit system: ActorSystem[_], ec: ExecutionContext) extends CatsHttpClient {
  private val logger: Logger = LoggerFactory.getLogger(classOf[CatsHttpClientImpl])
  private val start: Option[String] = Some("https://catfact.ninja/breeds")

  override def getAllBreads: Future[Seq[Cat]] = {
    Source
      .unfoldAsync(start) {
        case Some(next) =>
          val nextChunkFuture: Future[CatsResponse] = sendRequest(next)

          nextChunkFuture.map { resp =>
            resp.nextPageUrl match {
              case Some(url) => Some((Some(url), resp.data))
              case None => Some((None, resp.data))
            }
          }
        case None => Future.successful(None)
      }
      .runWith(Sink.fold(Seq(): Seq[Cat])(_ ++ _))
  }

  private def sendRequest(url: String): Future[CatsResponse] = {
    logger.info(s"CatsHttpClientImpl: Sending request $url")

    val request = HttpRequest(
      uri = Uri(url),
      headers = List(
        RawHeader("Accept", "application/json")
      )
    )
    Http(system).singleRequest(request).flatMap { response =>
      response.status match {
        case StatusCodes.OK =>
          logger.info("CatsHttpClientImpl: Received success")
          Unmarshal(response.entity).to[CatsResponse]

        case _ =>
          logger.error("CatsHttpClientImpl: Received error")
          throw new CatsHttpClientException()
      }
    }
  }
}

Upvotes: 0

Maxim Dvoryanchenko
Maxim Dvoryanchenko

Reputation: 121

Found the solution with writing my own GraphStage

final class PaginationGraphStage[S <: Uri, E](f: S => Future[Option[(S, E)]])(implicit ec: ExecutionContextExecutor)
  extends GraphStage[FlowShape[S, E]]{
  val in: Inlet[S] = Inlet[S]("PaginationGraphStage.in")
  val out: Outlet[E] = Outlet[E]("PaginationGraphStage.out")
  override val shape: FlowShape[S, E] = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler with InHandler {
      private[this] var state: S = _
      private[this] var inFlight = 0
      private[this] var asyncFinished = false
      private[this] def todo: Int = inFlight

      def futureCompleted(result: Try[Option[(Uri, E)]]): Unit = {
        inFlight -= 1
        result match {
          case Failure(ex) => fail(out, ex)
          case Success(None) =>
            asyncFinished = true
            complete(out)
          case Success(Some((newS: S, elem: E))) if !newS.isEmpty =>
            push(out, elem)
            state = newS
          case Success(Some((newS: Uri, elem: E))) =>
            push(out, elem)
            asyncFinished = true
            if (isAvailable(in)) getHandler(in).onPush()
            else completeStage()
        }
      }

      private val futureCB = getAsyncCallback(futureCompleted)
      private val invokeFutureCB: Try[Option[(S, E)]] => Unit = futureCB.invoke

      private def pullIfNeeded(): Unit = {
        if (!hasBeenPulled(in)) tryPull(in)
      }
      override def onUpstreamFinish(): Unit = {
        if (todo == 0) completeStage()
      }

      def onPull(): Unit = {
        if (state != null) {
          asyncFinished = false
          inFlight += 1
          val future = f(state)
          future.value match {
            case None => future.onComplete(invokeFutureCB)
            case Some(v) => futureCompleted(v)
          }
        } else {
          pullIfNeeded()
        }
      }

      override def onPush(): Unit = {
        if (state == null) {
          inFlight += 1
          state = grab(in)
          pullIfNeeded()
          getHandler(out).onPull()
        }
        if (asyncFinished) {
          inFlight += 1
          state = grab(in)
          pullIfNeeded()
        }
      }
      setHandlers(in, out, this)
    }
}

Upvotes: 1

Related Questions