Reputation: 6915
I am trying to come up with a solution to split an incoming String I am receiving into multiple Strings. I have been researching and it looks like in the previous versions of Akka-Streams there was a class Transformer
that you could extend to do this kind of transformation.
In the version I am using (RC2) there are Stage
s but I am not really sure how I can implement the splitting pattern.
Source.actorPublisher[String](MyActor.props).
.XXXXX(_.split("\n"))
.map(...)
.to(Sink(...))
I am looking for the XXXXX
component that will allow me to input a String
and return a sequence of String
and will emit each one to the rest of the flow.
Upvotes: 3
Views: 2377
Reputation: 17933
Akka provides the Framing
helper functions for this type of problem.
Assuming your charset is UTF-8 you can write a function that takes in the maximum size of the delimited String
values and returns a Flow
that can perform the splitting:
import akka.stream.scaladsl.Framing
import akka.util.ByteString
val newLineSplitter : (Int) => Flow[String, String, NotUsed] =
(maxLineSize) =>
Flow[String]
.map(ByteString.apply)
.via(Framing delimiter (ByteString("\n"), maxLineSize))
.via(Flow[ByteString] map (_.utf8String))
Upvotes: 2
Reputation: 35453
I agree with @jrudolph that mapConcat
is probably what you are looking for. A quick example showing this method in action:
val strings = List(
"""hello
world
test
this""",
"""foo
bar
baz
"""
)
implicit val system = ActorSystem("test")
implicit val mater = ActorFlowMaterializer()
Source(strings).
mapConcat(_.split("\n").map(_.trim).toList).
runForeach(println)
If you run this code you will see the following printed out:
hello
world
test
this
foo
bar
baz
Upvotes: 5