Reputation: 392
I am processing a stream of data from a file that is grouped by a key. I have created a class with an apply method that can be used to split the stream by key called KeyChanges[T,K]. Before the first item of a substream is processed, I need to retrieve some data from a DB. Once each substream is completed, I need to emit a message to a queue. In a standard scala sequence I would do something like this:
val groups: Map[Key, Seq[Value]] = stream.groupBy(v => v.k)
val groupSummaryF = Future.sequence(groups.map { case (k, group) =>
retrieveMyData(k).flatMap { data =>
Future.sequence(group.map(v => process(data, v))).map(
k -> _.foldLeft(0) { (a,t) =>
t match {
case Success(v) => a + 1
case Failure(ex) =>
println(s"failure: $ex")
a
}
}
).andThen {
case Success((key,count)) =>
sendMessage(count,key)
}
}
})
I would like to do something similar with Akka Streams. On the data retrieval, I could just cache the data and call the retrieval function for each element but for the queue message, I really do need to know when the substream is completed. So far I have not seen a way around this. Any ideas?
Upvotes: 2
Views: 828
Reputation: 13749
You can just run Stream, and execute action from Sink.
val categories = Array("DEBUG", "INFO", "WARN", "ERROR")
// assume we have a stream from file which produces categoryId -> message
val lines = (1 to 100).map(x => (Random.nextInt(categories.length), s"message $x"))
def loadDataFromDatabase(categoryId: Int): Future[String] =
Future.successful(categories(categoryId))
// assume this emits message to the queue
def emitToQueue(x: (String, Int)): Unit =
println(s"${x._2} messages from category ${x._1}")
val flow =
Flow[(Int, String)].
groupBy(4, _._1).
fold((0, List.empty[String])) { case ((_, acc), (catId, elem)) =>
(catId, elem :: acc)
}.
mapAsync(1) { case (catId, messages) =>
// here you load your stuff from the database
loadDataFromDatabase(catId).map(cat => (cat, messages))
}. // here you may want to do some more processing
map(x => (x._1, x._2.size)).
mergeSubstreams
// assume the source is a file
Source.fromIterator(() => lines.iterator).
via(flow).
to(Sink.foreach(emitToQueue)).run()
If you want to run it for multiple files, and report sums once for example, you can do it like that.
val futures = (1 to 4).map { x =>
Source.fromIterator(() => lines.iterator).via(flow).toMat(Sink.seq[(String, Int)])(Keep.right).run()
}
Future.sequence(futures).map { results =>
results.flatten.groupBy(_._1).foreach { case (cat, xs) =>
val total = xs.map(_._2).sum
println(s"$total messages from category $cat")
}
}
As you see, when you run the flow, you get a future. It will contain a materialized value (result of the flow), when it's finished, and you can do with it whatever you want.
Upvotes: 1