Guillaume Massé
Guillaume Massé

Reputation: 8064

How to create an akka-stream Source from a Flow that generate values recursively?

I need to traverse an API that is shaped like a tree. For example, a directory structure or threads of discussion. It can be modeled via the following flow:

type ItemId = Int
type Data = String
case class Item(data: Data, kids: List[ItemId])

def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString 

// 0 => [1, 9]
// 1 => [10, 19]
// 2 => [20, 29]
// ...
// 9 => [90, 99]
// _ => []
// NB. I don't have access to this function, only the itemFlow.
def nested(id: ItemId): List[ItemId] =
  if (id == 0) (1 to 9).toList
  else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList
  else Nil

val itemFlow: Flow[ItemId, Item, NotUsed] = 
  Flow.fromFunction(id => Item(randomData, nested(id)))

How can I traverse this data? I got the following working:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._

import scala.concurrent.Await
import scala.concurrent.duration.Duration

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val loop = 
  GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val source = b.add(Flow[Int])
    val merge  = b.add(Merge[Int](2))
    val fetch  = b.add(itemFlow) 
    val bcast  = b.add(Broadcast[Item](2))

    val kids   = b.add(Flow[Item].mapConcat(_.kids))
    val data   = b.add(Flow[Item].map(_.data))

    val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead)

    source ~> merge ~> fetch           ~> bcast ~> data
              merge <~ buffer <~ kids  <~ bcast

    FlowShape(source.in, data.out)
  }

val flow = Flow.fromGraph(loop)


Await.result(
  Source.single(0).via(flow).runWith(Sink.foreach(println)),
  Duration.Inf
)

system.terminate()

However, since I'm using a flow with a buffer, the Stream will never complete.

Completes when upstream completes and buffered elements have been drained

Flow.buffer

I read the Graph cycles, liveness, and deadlocks section multiple times and I'm still struggling to find an answer.

This would create a live lock:

import java.util.concurrent.atomic.AtomicInteger

def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = {
  // keep track of how many element flows, 
  val remaining = new AtomicInteger(1) // 1 = seed

  // should be > max loop(x)
  val bufferSize = 10000

  val (ref, publisher) =
    Source.actorRef[S](bufferSize, OverflowStrategy.fail)
      .toMat(Sink.asPublisher(true))(Keep.both)
      .run()

  ref ! seed

  Source.fromPublisher(publisher)
    .via(flow)
    .map{x =>
      loop(x).foreach{ c =>
        remaining.incrementAndGet()
        ref ! c
      }
      x
    }
    .takeWhile(_ => remaining.decrementAndGet > 0)
}

EDIT: I added a git repo to test your solution https://github.com/MasseGuillaume/source-unfold

Upvotes: 24

Views: 2894

Answers (3)

Guillaume Mass&#233;
Guillaume Mass&#233;

Reputation: 8064

I solved this problem by writing my own GraphStage.

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}

import scala.concurrent.ExecutionContext

import scala.collection.mutable
import scala.util.{Success, Failure, Try}

import scala.collection.mutable

def unfoldTree[S, E](seeds: List[S], 
                     flow: Flow[S, E, NotUsed],
                     loop: E => List[S],
                     bufferSize: Int)(implicit ec: ExecutionContext): Source[E, NotUsed] = {
  Source.fromGraph(new UnfoldSource(seeds, flow, loop, bufferSize))
}

object UnfoldSource {
  implicit class MutableQueueExtensions[A](private val self: mutable.Queue[A]) extends AnyVal {
    def dequeueN(n: Int): List[A] = {
      val b = List.newBuilder[A]
      var i = 0
      while (i < n) {
        val e = self.dequeue
        b += e
        i += 1
      }
      b.result()
    }
  }
}

class UnfoldSource[S, E](seeds: List[S],
                         flow: Flow[S, E, NotUsed],
                         loop: E => List[S],
                         bufferSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] {

  val out: Outlet[E] = Outlet("UnfoldSource.out")
  override val shape: SourceShape[E] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {  
    // Nodes to expand
    val frontier = mutable.Queue[S]()
    frontier ++= seeds

    // Nodes expanded
    val buffer = mutable.Queue[E]()

    // Using the flow to fetch more data
    var inFlight = false

    // Sink pulled but the buffer was empty
    var downstreamWaiting = false

    def isBufferFull() = buffer.size >= bufferSize

    def fillBuffer(): Unit = {
      val batchSize = Math.min(bufferSize - buffer.size, frontier.size)
      val batch = frontier.dequeueN(batchSize)
      inFlight = true

      val toProcess =
        Source(batch)
          .via(flow)
          .runWith(Sink.seq)(materializer)

      val callback = getAsyncCallback[Try[Seq[E]]]{
        case Failure(ex) => {
          fail(out, ex)
        }
        case Success(es) => {
          val got = es.size
          inFlight = false
          es.foreach{ e =>
            buffer += e
            frontier ++= loop(e)
          }
          if (downstreamWaiting && buffer.nonEmpty) {
            val e = buffer.dequeue
            downstreamWaiting = false
            sendOne(e)
          } else {
            checkCompletion()
          }
          ()
        }
      }

      toProcess.onComplete(callback.invoke)
    }
    override def preStart(): Unit = {
      checkCompletion()
    }

    def checkCompletion(): Unit = {
      if (!inFlight && buffer.isEmpty && frontier.isEmpty) {
        completeStage()
      }
    } 

    def sendOne(e: E): Unit = {
      push(out, e)
      checkCompletion()
    }

    def onPull(): Unit = {
      if (buffer.nonEmpty) {
        sendOne(buffer.dequeue)
      } else {
        downstreamWaiting = true
      }

      if (!isBufferFull && frontier.nonEmpty) {
        fillBuffer()
      }
    }

    setHandler(out, this)
  }
}

Upvotes: 3

Cause of Non-Completion

I don't think the cause of the stream never completing is due to "using a flow with a buffer". The actual cause, similar to this question, is the fact that merge with the default parameter eagerClose=False is waiting on both the source and the buffer to complete before it (merge) completes. But buffer is waiting on merge to complete. So merge is waiting on buffer and buffer is waiting on merge.

eagerClose merge

You could set eagerClose=True when creating your merge. But using eager close may unfortunately result in some children ItemId values never being queried.

Indirect Solution

If you materialize a new stream for each level of the tree then the recursion can be extracted to outside of the stream.

You can construct a query function utilizing the itemFlow:

val itemQuery : Iterable[ItemId] => Future[Seq[Data]] = 
  (itemIds) => Source.apply(itemIds)
                     .via(itemFlow)
                     .runWith(Sink.seq[Data])

This query function can now be wrapped inside of a recursive helper function:

val recQuery : (Iterable[ItemId], Iterable[Data]) => Future[Seq[Data]] = 
  (itemIds, currentData) => itemQuery(itemIds) flatMap { allNewData =>
      val allNewKids = allNewData.flatMap(_.kids).toSet

      if(allNewKids.isEmpty)
        Future.successful(currentData ++ allNewData)
      else
        recQuery(allNewKids, currentData ++ data)
  }

The number of streams created will be equal to the maximum depth of the tree.

Unfortunately, because Futures are involved, this recursive function is not tail-recursive and could result in a "stack overflow" if the tree is too deep.

Upvotes: 5

Astrid
Astrid

Reputation: 1828

Ah, the joys of cycles in Akka streams. I had a very similar problem which I solved in a deeply hacky way. Possibly it'll be helpful for you.

Hacky Solution:

  // add a graph stage that will complete successfully if it sees no element within 5 seconds
  val timedStopper = b.add(
    Flow[Item]
      .idleTimeout(5.seconds)
      .recoverWithRetries(1, {
        case _: TimeoutException => Source.empty[Item]
      }))

  source ~> merge ~> fetch ~> timedStopper ~> bcast ~> data
  merge <~ buffer <~ kids <~ bcast

What this does is that 5 seconds after the last element passes through the timedStopper stage, that stage completes the stream successfully. This is achieved via the use of idleTimeout, which fails the stream with a TimeoutException, and then using recoverWithRetries to turn that failure into a successful completion. (I did mention it was hacky).

This is obviously not suitable if you might have more than 5 seconds between elements, or if you can't afford a long wait between the stream "actually" completing and Akka picking up on it. Thankfully, neither were a concern for us, and in that case it actually works pretty well!

Non-hacky solution

Unfortunately, the only ways I can think of to do this without cheating via timeouts are very, very complicated.

Basically, you need to be able to track two things:

  • are there any elements still in the buffer, or in process of being sent to the buffer
  • is the incoming source open

and complete the stream if and only if the answer to both questions is no. Native Akka building blocks are probably not going to be able to handle this. A custom graph stage might, however. An option might be to write one that takes the place of Merge and give it some way of knowing about the buffer contents, or possibly have it track both the IDs it receives and the IDs the broadcast is sending to the buffer. The problem being that custom graph stages are not particularly pleasant to write at the best of times, let alone when you're mixing logic across stages like this.

Warnings

Akka streams just don't work well with cycles, especially how they calculate completion. As a result, this may not be the only problem you encounter.

For instance, an issue we had with a very similar structure was that a failure in the source was treated as the stream completing successfully, with a succeeded Future being materialised. The problem is that by default, a stage that fails will fail its downstreams but cancel its upstreams (which counts as a successful completion for those stages). With a cycle like the one you have, the result is a race as cancellation propagates down one branch but failure down the other. You also need to check what happens if the sink errors; depending on the cancellation settings for broadcast, it's possible the cancellation will not propagate upwards and the source will happily continue pulling in elements.

One final option: avoid handling the recursive logic with streams at all. On one extreme, if there's any way for you to write a single tail-recursive method that pulls out all the nested items at once and put that into a Flow stage, that will solve your problems. On the other, we're seriously considering going to Kafka queueing for our own system.

Upvotes: 0

Related Questions