hveiga
hveiga

Reputation: 6915

Splitting inside a flow in Akka-Streams

I am trying to come up with a solution to split an incoming String I am receiving into multiple Strings. I have been researching and it looks like in the previous versions of Akka-Streams there was a class Transformer that you could extend to do this kind of transformation.

In the version I am using (RC2) there are Stages but I am not really sure how I can implement the splitting pattern.

Source.actorPublisher[String](MyActor.props).
.XXXXX(_.split("\n"))
.map(...)
.to(Sink(...))

I am looking for the XXXXX component that will allow me to input a String and return a sequence of String and will emit each one to the rest of the flow.

Upvotes: 3

Views: 2377

Answers (2)

Akka provides the Framing helper functions for this type of problem.

Assuming your charset is UTF-8 you can write a function that takes in the maximum size of the delimited String values and returns a Flow that can perform the splitting:

import akka.stream.scaladsl.Framing
import akka.util.ByteString

val newLineSplitter : (Int) => Flow[String, String, NotUsed] = 
  (maxLineSize) =>
    Flow[String]
      .map(ByteString.apply)
      .via(Framing delimiter (ByteString("\n"), maxLineSize))
      .via(Flow[ByteString] map (_.utf8String))

Upvotes: 2

cmbaxter
cmbaxter

Reputation: 35453

I agree with @jrudolph that mapConcat is probably what you are looking for. A quick example showing this method in action:

  val strings = List(
  """hello
     world
     test
     this""",
     """foo
     bar
     baz
     """

  )

  implicit val system = ActorSystem("test")
  implicit val mater = ActorFlowMaterializer()
  Source(strings).
    mapConcat(_.split("\n").map(_.trim).toList).
    runForeach(println)

If you run this code you will see the following printed out:

hello
world
test
this
foo     
bar
baz

Upvotes: 5

Related Questions