Reputation: 457
I have a file processing job that currently uses akka actors with manually managed backpressure to handle the processing pipeline, but I've never been able to successfully manage the backpressure at the input file reading stage.
This job takes an input file and groups lines by an ID number present at the start of each line, and then once it hits a line with a new ID number, it pushes the grouped lines to a processing actor via message, and then continues with the new ID number, all the way until it reaches the end of the file.
This seems like it would be a good use case for Akka Streams, using the File as a sink, but I'm still not sure of three things:
1) How can I read the file line by line?
2) How can I group by the ID present on every line? I currently use very imperative processing for this, and I don't think I'll have the same ability in a stream pipeline.
3) How can I apply backpressure, such that I don't keep reading lines into memory faster than I can process the data downstream?
Upvotes: 5
Views: 2381
Reputation: 1119
Akka streams' groupBy
is one approach. But groupBy has a maxSubstreams
param which would require that you to know that max # of ID ranges up front. So: the solution below uses scan
to identify same-ID blocks, and splitWhen
to split into substreams:
object Main extends App {
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
def extractId(s: String) = {
val a = s.split(",")
a(0) -> a(1)
}
val file = new File("/tmp/example.csv")
private val lineByLineSource = FileIO.fromFile(file)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
val future: Future[Done] = lineByLineSource
.map(extractId)
.scan( (false,"","") )( (l,r) => (l._2 != r._1, r._1, r._2) )
.drop(1)
.splitWhen(_._1)
.fold( ("",Seq[String]()) )( (l,r) => (r._2, l._2 ++ Seq(r._3) ))
.concatSubstreams
.runForeach(println)
private val reply = Await.result(future, 10 seconds)
println(s"Received $reply")
Await.ready(system.terminate(), 10 seconds)
}
extractId
splits lines into id -> data tuples. scan
prepends id -> data tuples with a start-of-ID-range flag. The drop
drops the primer element to scan
. splitwhen
starts a new substream for each start-of-range. fold
concatenates substreams to lists and removes the start-of-ID-range boolean, so that each substream produces a single element. In place of the fold you probably want a custom SubFlow
which processes a streams of rows for a single ID and emits some result for the ID range. concatSubstreams
merges the per-ID-range substreams produced by splitWhen back into a single stream that's printed by runForEach
.
Run with:
$ cat /tmp/example.csv
ID1,some input
ID1,some more input
ID1,last of ID1
ID2,one line of ID2
ID3,2nd before eof
ID3,eof
Output is:
(ID1,List(some input, some more input, last of ID1))
(ID2,List(one line of ID2))
(ID3,List(2nd before eof, eof))
Upvotes: 8
Reputation: 10882
It appears that the easiest way to add "back pressure" to your system without introducing huge modifications is to simply change the mailbox type of the input groups consuming Actor to BoundedMailbox.
Change the type of the Actor that consumes your lines to BoundedMailbox with high mailbox-push-timeout-time
:
bounded-mailbox {
mailbox-type = "akka.dispatch.BoundedDequeBasedMailbox"
mailbox-capacity = 1
mailbox-push-timeout-time = 1h
}
val actor = system.actorOf(Props(classOf[InputGroupsConsumingActor]).withMailbox("bounded-mailbox"))
Create iterator from your file, create grouped (by id) iterator from that iterator. Then just cycle through the data, sending groups to consuming Actor. Note, that send will block in this case, when Actor's mailbox gets full.
def iterGroupBy[A, K](iter: Iterator[A])(keyFun: A => K): Iterator[Seq[A]] = {
def rec(s: Stream[A]): Stream[Seq[A]] =
if (s.isEmpty) Stream.empty else {
s.span(keyFun(s.head) == keyFun(_)) match {
case (prefix, suffix) => prefix.toList #:: rec(suffix)
}
}
rec(iter.toStream).toIterator
}
val lines = Source.fromFile("input.file").getLines()
iterGroupBy(lines){l => l.headOption}.foreach {
lines:Seq[String] =>
actor.tell(lines, ActorRef.noSender)
}
That's it!
You probably want to move file reading stuff to separate thread, as it's gonna block. Also by adjusting mailbox-capacity
you can regulate amount of consumed memory. But if reading batch from the file is always faster than processing, it seems reasonable to keep capacity small, like 1 or 2.
upd iterGroupBy
implemented with Stream
, tested not to produce StackOverflow
.
Upvotes: 0