M. M.
M. M.

Reputation: 332

Akka streams — filtering by the number of elements in stream

I'm writing an app in Scala and I'm using Akka streams.

At one point, I need to filter out streams that have less than N elements, with N given. So, for example, with N=5:

Source(List(1,2,3)).via(myFilter)       // => List()
Source(List(1,2,3,4)).via(myFilter)     // => List()

will become empty streams, and

Source(List(1,2,3,4,5)).via(myFilter)   // => List(1,2,3,4,5)
Source(List(1,2,3,4,5,6)).via(myFilter) // => List(1,2,3,4,5,6)

will be unchanged.

Of course, we can't know the number of elements in the stream until it's over, and waiting till the end before pushing it through might not be the best idea.

So, instead, I've thought about the following algorithm:

  1. for the first N-1 elements, just buffer them, without passing further;
  2. if the input stream finishes before reaching the Nth element, output an empty stream;
  3. if the input stream reaches Nth element, output the buffered N-1 elements, then output the Nth element, then pass all the following elements that come.

However, I have no idea how to build a Flow element implementing it. Are there some built-in Akka elements I could use?

Edit:

Okay, so I played with it yesterday and I came up with something like that:

Flow[Int].
  prefixAndTail(N).
  flatMapConcat {
    case (prefix, tail) if prefix.length == N =>
      Source(prefix).concat(tail)
    case _ =>
      Source.empty[Int]
  }

Will it do what I want?

Upvotes: 3

Views: 1692

Answers (2)

This may be one of those instances where a little "state" can go a long way. Even though the solution is not "purely functional", the updating state will be isolated and unreachable by the rest of the system. I think this is one of the beauties of scala: when an FP solution isn't obvious you can always revert to imperative in an isolated manner...

The completed Flow will be a combination of multiple sub-parts. The first Flow will just group your elements into sequences of size N:

val group : Int => Flow[Int, Seq[Int], _] = 
  (N) => Flow[Int] grouped N

Now for the non-functional part, a filter that will only allow the grouped Seq values through if the first sequence was the right size:

val minSizeRequirement : Int => Seq[Int] => Boolean = 
  (minSize) => {
    var isFirst : Boolean = True

    var passedMinSize : Boolean = False

    (testSeq) => {
      if(isFirst) {
        isFirst = False
        passedMinSize = testSeq.size >= minSize
        passedMinSize
      }
      else
        passedMinSize
      }
    }
  }

val minSizeFilter : Int => Flow[Seq[Int], Seq[Int], _] = 
  (minSize) => Flow[Seq[Int]].filter(minSizeRequirement(minSize))

The last step is to convert the Seq[Int] values back into Int values:

val flatten = Flow[Seq[Int]].flatMapConcat(l => Source(l))

Finally, combine them all together:

val combinedFlow : Int => Flow[Int, Int, _] =
  (minSize) => 
    group(minSize) 
      .via(minSizeFilter(minSize))
      .via(flatten)

Upvotes: 1

Perhaps statefulMapConcat could help you:

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, Materializer}

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext

object StatefulMapConcatExample extends App {

  implicit val system: ActorSystem = ActorSystem()
  implicit val materializer: Materializer = ActorMaterializer()
  implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

  def filterLessThen(threshold: Int): (Int) => List[Int] = {
    var buffering = true
    val buffer: ListBuffer[Int] = ListBuffer()
    (elem: Int) =>
      if (buffering) {
        buffer += elem
        if (buffer.size < threshold) {
          Nil
        } else {
          buffering = false
          buffer.toList
        }
      } else {
        List(elem)
      }
  }

  //Nil
  Source(List(1, 2, 3)).statefulMapConcat(() => filterLessThen(5))
    .runWith(Sink.seq).map(println)

  //Nil
  Source(List(1, 2, 3, 4)).statefulMapConcat(() => filterLessThen(5))
    .runWith(Sink.seq).map(println)

  //Vector(1,2,3,4,5)
  Source(List(1, 2, 3, 4, 5)).statefulMapConcat(() => filterLessThen(5))
    .runWith(Sink.seq).map(println)

  //Vector(1,2,3,4,5,6)
  Source(List(1, 2, 3, 4, 5, 6)).statefulMapConcat(() => filterLessThen(5))
    .runWith(Sink.seq).map(println)
}

Upvotes: 2

Related Questions