user13771733
user13771733

Reputation: 25

Akka stream - drop message if condition is true

In this example, I have a stream of Ticker instances, which have a sequence attribute.

I want to drop any messages where the sequence number is lower than the previous one.

I can do something like the following, but it's pretty ugly. Is there a simpler approach? And is there a name for this pattern?

    source
        .scan(TickerInOrder())((state, ticker) => TickerInOrder(state, ticker))
        .collect { case TickerInOrder(Some(ticker), Some(inOrder)) if inOrder => ticker }

    // ~~~~~~~~

    object TickerInOrder {
      def apply(state: TickerInOrder, ticker: Ticker): TickerInOrder = {
        val inOrder = state.ticker match {
          case Some(prev) => ticker.sequence > prev.sequence
          case None => true
        }
        TickerInOrder(Some(ticker), Some(inOrder))
      }
    }

    case class TickerInOrder(ticker: Option[Ticker] = None, inOrder: Option[Boolean] = None)

Upvotes: 0

Views: 154

Answers (1)

yiksanchan
yiksanchan

Reputation: 1940

You can use statefulMapConcat, see docs https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/statefulMapConcat.html

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source

object Stateful {
  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("Stateful")
    Source(List(1,3,2,4,5,6,0,7)).statefulMapConcat {
      () =>
        var prev = 0L
        element =>
          if (element > prev) {
            prev = element
            element :: Nil
          } else {
            prev = element
            Nil
          }
    }.runForeach(println)
    // 1 3 4 5 6 7
  }
}

It is simple to change the code to work with Ticker and sequence.

Upvotes: 2

Related Questions