TheDarkSaint
TheDarkSaint

Reputation: 457

Sink for line-by-line file IO with backpressure

I have a file processing job that currently uses akka actors with manually managed backpressure to handle the processing pipeline, but I've never been able to successfully manage the backpressure at the input file reading stage.

This job takes an input file and groups lines by an ID number present at the start of each line, and then once it hits a line with a new ID number, it pushes the grouped lines to a processing actor via message, and then continues with the new ID number, all the way until it reaches the end of the file.

This seems like it would be a good use case for Akka Streams, using the File as a sink, but I'm still not sure of three things:

1) How can I read the file line by line?

2) How can I group by the ID present on every line? I currently use very imperative processing for this, and I don't think I'll have the same ability in a stream pipeline.

3) How can I apply backpressure, such that I don't keep reading lines into memory faster than I can process the data downstream?

Upvotes: 5

Views: 2381

Answers (2)

tariksbl
tariksbl

Reputation: 1119

Akka streams' groupBy is one approach. But groupBy has a maxSubstreams param which would require that you to know that max # of ID ranges up front. So: the solution below uses scan to identify same-ID blocks, and splitWhen to split into substreams:

object Main extends App {
  implicit val system = ActorSystem("system")
  implicit val materializer = ActorMaterializer()

  def extractId(s: String) = {
    val a = s.split(",")
    a(0) -> a(1)
  }

  val file = new File("/tmp/example.csv")

  private val lineByLineSource = FileIO.fromFile(file)
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
    .map(_.utf8String)

  val future: Future[Done] = lineByLineSource
    .map(extractId)
    .scan( (false,"","") )( (l,r) => (l._2 != r._1, r._1, r._2) )
    .drop(1)
    .splitWhen(_._1)
    .fold( ("",Seq[String]()) )( (l,r) => (r._2, l._2 ++ Seq(r._3) ))
    .concatSubstreams
    .runForeach(println)

  private val reply = Await.result(future, 10 seconds)
  println(s"Received $reply")
  Await.ready(system.terminate(), 10 seconds)
}

extractId splits lines into id -> data tuples. scan prepends id -> data tuples with a start-of-ID-range flag. The drop drops the primer element to scan. splitwhen starts a new substream for each start-of-range. fold concatenates substreams to lists and removes the start-of-ID-range boolean, so that each substream produces a single element. In place of the fold you probably want a custom SubFlow which processes a streams of rows for a single ID and emits some result for the ID range. concatSubstreams merges the per-ID-range substreams produced by splitWhen back into a single stream that's printed by runForEach .

Run with:

$ cat /tmp/example.csv
ID1,some input
ID1,some more input
ID1,last of ID1
ID2,one line of ID2
ID3,2nd before eof
ID3,eof

Output is:

(ID1,List(some input, some more input, last of ID1))
(ID2,List(one line of ID2))
(ID3,List(2nd before eof, eof))

Upvotes: 8

Aivean
Aivean

Reputation: 10882

It appears that the easiest way to add "back pressure" to your system without introducing huge modifications is to simply change the mailbox type of the input groups consuming Actor to BoundedMailbox.

  1. Change the type of the Actor that consumes your lines to BoundedMailbox with high mailbox-push-timeout-time:

    bounded-mailbox {
      mailbox-type = "akka.dispatch.BoundedDequeBasedMailbox"
      mailbox-capacity = 1
      mailbox-push-timeout-time = 1h
    }
    
    val actor = system.actorOf(Props(classOf[InputGroupsConsumingActor]).withMailbox("bounded-mailbox"))
    
  2. Create iterator from your file, create grouped (by id) iterator from that iterator. Then just cycle through the data, sending groups to consuming Actor. Note, that send will block in this case, when Actor's mailbox gets full.

    def iterGroupBy[A, K](iter: Iterator[A])(keyFun: A => K): Iterator[Seq[A]] = {
      def rec(s: Stream[A]): Stream[Seq[A]] =
        if (s.isEmpty) Stream.empty else {
          s.span(keyFun(s.head) == keyFun(_)) match {
          case (prefix, suffix) => prefix.toList #:: rec(suffix)
        }
      }
      rec(iter.toStream).toIterator
    }
    
    val lines = Source.fromFile("input.file").getLines()
    
    iterGroupBy(lines){l => l.headOption}.foreach {
      lines:Seq[String] =>
          actor.tell(lines, ActorRef.noSender)
    }
    

That's it! You probably want to move file reading stuff to separate thread, as it's gonna block. Also by adjusting mailbox-capacity you can regulate amount of consumed memory. But if reading batch from the file is always faster than processing, it seems reasonable to keep capacity small, like 1 or 2.

upd iterGroupBy implemented with Stream, tested not to produce StackOverflow.

Upvotes: 0

Related Questions