Reputation: 2413
I want to use alsoTo
to copy the elements from one Source
to another, but it does not work as I'm expecting. An example of code which creates an Akka Source
from a Java InputStream
and does some transformation and uses alsoTo
to create a copy of s1
:
import java.io.ByteArrayInputStream
import java.nio.charset.StandardCharsets
import akka.actor.ActorSystem
import akka.stream.IOResult
import akka.stream.scaladsl.{Sink, Source, StreamConverters}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
object Main {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("AkkaStreams_alsoTo")
implicit val executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
val byteStream = new ByteArrayInputStream("a,b,c\nd,e,f\ng,h,j".getBytes(StandardCharsets.UTF_8))
try {
val s1: Source[List[List[String]], Future[IOResult]] = StreamConverters.fromInputStream(() => byteStream)
.map { bs =>
val rows = bs.utf8String.split("\n").toList
val valuesPerRow = rows.map(row => row.split(",").toList)
valuesPerRow
}
// A copy of s1?
val s2: Source[List[List[String]], Future[IOResult]] = s1.alsoTo(Sink.collection)
println("s1.runForeach: ")
Await.result(s1.runForeach(println), 20.seconds)
println("s2.runForeach: ")
Await.result(s2.runForeach(println), 20.seconds)
println("Done")
system.terminate()
}
finally {
byteStream.close()
}
}
}
It produces the following output:
s1.runForeach:
List(List(a, b, c), List(d, e, f), List(g, h, j))
s2.runForeach:
Done
As you can see, s2.runForeach
doesn't print any elements. What is the reason of such behavior--is it because of a side effect when it reads the Java InputStream
?
I'm using Akka Streams v2.6.8.
Upvotes: 1
Views: 977
Reputation: 19497
I want to use
alsoTo
to copy the elements from oneSource
to another, but it does not work as I'm expecting.
alsoTo
doesn't copy elements from a Source
to another Source
; it effectively copies the elements from a Source
/Flow
and sends them to another Sink
(the argument to the alsoTo
method). Therefore your expectation is incorrect.
As you can see,
s2.runForeach
doesn't print any elements. What is the reason of such behavior--is it because of a side effect when it reads the JavaInputStream
?
Because byteStream
is a val
, both s1.runForeach(println)
and s2.runForeach(println)
"share" this instance even though they are two distinct Akka Stream blueprints. Therefore, when s1.runForeach(println)
is called, byteStream
is consumed, and when s2.runForeach(println)
is executed afterward, there is nothing left in the InputStream
for s2.runForeach(println)
to print.
Change byteStream
to a def
, and the following is printed:
s1.runForeach:
List(List(a, b, c), List(d, e, f), List(g, h, j))
s2.runForeach:
List(List(a, b, c), List(d, e, f), List(g, h, j))
Done
That explains why s2.runForeach(println)
doesn't print anything in this particular case, but it doesn't really show alsoTo
in action. Your setup is flawed because s2.runForeach(println)
just prints the elements from the Source
and ignores the materialized value of alsoTo(Sink.collection)
.
One easy way to see the behavior of alsoTo
is the following:
val byteStream = new ByteArrayInputStream("a,b,c\nd,e,f\ng,h,j".getBytes(StandardCharsets.UTF_8))
val s1: Source[List[List[String]], Future[IOResult]] =
StreamConverters.fromInputStream(() => byteStream)
.map { bs =>
val rows = bs.utf8String.split("\n").toList
val valuesPerRow = rows.map(row => row.split(",").toList)
valuesPerRow
}
val stream = s1.alsoTo(Sink.foreach(println)).runWith(Sink.foreach(println))
// ^ same thing as .runForeach(println)
Await.ready(stream, 5.seconds)
println("Done")
system.terminate()
Running the above prints the following:
List(List(a, b, c), List(d, e, f), List(g, h, j))
List(List(a, b, c), List(d, e, f), List(g, h, j))
Done
The elements in the one Source
are sent to both Sink
s.
If you want to use Sink.collection
...
val (result1, result2) =
s1.alsoToMat(Sink.collection)(Keep.right).toMat(Sink.collection)(Keep.both).run()
// ^ note the use of alsoToMat in order to retain the materialized value
val res1 = Await.result(result1, 5.seconds)
val res2 = Await.result(result2, 5.seconds)
println(s"res1: $res1")
println(s"res2: $res2")
println("Done")
system.terminate()
...which prints...
res1: List(List(List(a, b, c), List(d, e, f), List(g, h, j)))
res2: List(List(List(a, b, c), List(d, e, f), List(g, h, j)))
Done
Again, the elements in one Source
are sent to two Sink
s.
Upvotes: 2