AlphaGeek
AlphaGeek

Reputation: 392

Performing Actions Before/After substream in akka-streams 2.4+

I am processing a stream of data from a file that is grouped by a key. I have created a class with an apply method that can be used to split the stream by key called KeyChanges[T,K]. Before the first item of a substream is processed, I need to retrieve some data from a DB. Once each substream is completed, I need to emit a message to a queue. In a standard scala sequence I would do something like this:

val groups: Map[Key, Seq[Value]] = stream.groupBy(v => v.k)
val groupSummaryF = Future.sequence(groups.map { case (k, group) =>
  retrieveMyData(k).flatMap { data =>
    Future.sequence(group.map(v => process(data, v))).map(
      k -> _.foldLeft(0) { (a,t) =>
        t match {
          case Success(v) => a + 1
          case Failure(ex) =>
            println(s"failure: $ex")
            a
        }
      }
    ).andThen {
      case Success((key,count)) =>
        sendMessage(count,key)
    }
  }
})

I would like to do something similar with Akka Streams. On the data retrieval, I could just cache the data and call the retrieval function for each element but for the queue message, I really do need to know when the substream is completed. So far I have not seen a way around this. Any ideas?

Upvotes: 2

Views: 828

Answers (1)

lpiepiora
lpiepiora

Reputation: 13749

You can just run Stream, and execute action from Sink.

val categories = Array("DEBUG", "INFO", "WARN", "ERROR")

// assume we have a stream from file which produces categoryId -> message
val lines = (1 to 100).map(x => (Random.nextInt(categories.length), s"message $x"))

def loadDataFromDatabase(categoryId: Int): Future[String] =
  Future.successful(categories(categoryId))

// assume this emits message to the queue
def emitToQueue(x: (String, Int)): Unit =
  println(s"${x._2} messages from category ${x._1}")

val flow =
  Flow[(Int, String)].
    groupBy(4, _._1).
    fold((0, List.empty[String])) { case ((_, acc), (catId, elem)) =>
      (catId, elem :: acc)
    }.
    mapAsync(1) { case (catId, messages) =>
      // here you load your stuff from the database
      loadDataFromDatabase(catId).map(cat => (cat, messages))
    }. // here you may want to do some more processing
    map(x => (x._1, x._2.size)).
    mergeSubstreams

// assume the source is a file
Source.fromIterator(() => lines.iterator).
via(flow).
to(Sink.foreach(emitToQueue)).run()

If you want to run it for multiple files, and report sums once for example, you can do it like that.

val futures = (1 to 4).map { x =>
  Source.fromIterator(() => lines.iterator).via(flow).toMat(Sink.seq[(String, Int)])(Keep.right).run()
}
Future.sequence(futures).map { results =>
  results.flatten.groupBy(_._1).foreach { case (cat, xs) =>
    val total = xs.map(_._2).sum
    println(s"$total messages from category $cat")
  }
}

As you see, when you run the flow, you get a future. It will contain a materialized value (result of the flow), when it's finished, and you can do with it whatever you want.

Upvotes: 1

Related Questions