Artavazd Balayan
Artavazd Balayan

Reputation: 2413

Akka Streams: Copy element to another stream via alsoTo

I want to use alsoTo to copy the elements from one Source to another, but it does not work as I'm expecting. An example of code which creates an Akka Source from a Java InputStream and does some transformation and uses alsoTo to create a copy of s1:

import java.io.ByteArrayInputStream
import java.nio.charset.StandardCharsets

import akka.actor.ActorSystem
import akka.stream.IOResult
import akka.stream.scaladsl.{Sink, Source, StreamConverters}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

object Main {
  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("AkkaStreams_alsoTo")
    implicit val executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

    val byteStream = new ByteArrayInputStream("a,b,c\nd,e,f\ng,h,j".getBytes(StandardCharsets.UTF_8))
    try {
      val s1: Source[List[List[String]], Future[IOResult]] = StreamConverters.fromInputStream(() => byteStream)
        .map { bs =>
          val rows = bs.utf8String.split("\n").toList
          val valuesPerRow = rows.map(row => row.split(",").toList)
          valuesPerRow
        }
      // A copy of s1?
      val s2: Source[List[List[String]], Future[IOResult]] = s1.alsoTo(Sink.collection)
      println("s1.runForeach: ")
      Await.result(s1.runForeach(println), 20.seconds)

      println("s2.runForeach: ")
      Await.result(s2.runForeach(println), 20.seconds)

      println("Done")
      system.terminate()
    }
    finally {
      byteStream.close()
    }
  }
}

It produces the following output:

s1.runForeach: 
List(List(a, b, c), List(d, e, f), List(g, h, j))
s2.runForeach: 
Done

As you can see, s2.runForeach doesn't print any elements. What is the reason of such behavior--is it because of a side effect when it reads the Java InputStream?

I'm using Akka Streams v2.6.8.

Upvotes: 1

Views: 977

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19497

I want to use alsoTo to copy the elements from one Source to another, but it does not work as I'm expecting.

alsoTo doesn't copy elements from a Source to another Source; it effectively copies the elements from a Source/Flow and sends them to another Sink (the argument to the alsoTo method). Therefore your expectation is incorrect.

As you can see, s2.runForeach doesn't print any elements. What is the reason of such behavior--is it because of a side effect when it reads the Java InputStream?

Because byteStream is a val, both s1.runForeach(println) and s2.runForeach(println) "share" this instance even though they are two distinct Akka Stream blueprints. Therefore, when s1.runForeach(println) is called, byteStream is consumed, and when s2.runForeach(println) is executed afterward, there is nothing left in the InputStream for s2.runForeach(println) to print.

Change byteStream to a def, and the following is printed:

s1.runForeach: 
List(List(a, b, c), List(d, e, f), List(g, h, j))
s2.runForeach: 
List(List(a, b, c), List(d, e, f), List(g, h, j))
Done

That explains why s2.runForeach(println) doesn't print anything in this particular case, but it doesn't really show alsoTo in action. Your setup is flawed because s2.runForeach(println) just prints the elements from the Source and ignores the materialized value of alsoTo(Sink.collection).

One easy way to see the behavior of alsoTo is the following:

val byteStream = new ByteArrayInputStream("a,b,c\nd,e,f\ng,h,j".getBytes(StandardCharsets.UTF_8))

val s1: Source[List[List[String]], Future[IOResult]] =
  StreamConverters.fromInputStream(() => byteStream)
    .map { bs =>
      val rows = bs.utf8String.split("\n").toList
      val valuesPerRow = rows.map(row => row.split(",").toList)
      valuesPerRow
    }

val stream = s1.alsoTo(Sink.foreach(println)).runWith(Sink.foreach(println))
                                          // ^ same thing as .runForeach(println)
Await.ready(stream, 5.seconds)
println("Done")
system.terminate()

Running the above prints the following:

List(List(a, b, c), List(d, e, f), List(g, h, j))
List(List(a, b, c), List(d, e, f), List(g, h, j))
Done

The elements in the one Source are sent to both Sinks.

If you want to use Sink.collection...

val (result1, result2) =
  s1.alsoToMat(Sink.collection)(Keep.right).toMat(Sink.collection)(Keep.both).run()
 // ^ note the use of alsoToMat in order to retain the materialized value

val res1 = Await.result(result1, 5.seconds)
val res2 = Await.result(result2, 5.seconds)
println(s"res1: $res1")
println(s"res2: $res2")
println("Done")
system.terminate()

...which prints...

res1: List(List(List(a, b, c), List(d, e, f), List(g, h, j)))
res2: List(List(List(a, b, c), List(d, e, f), List(g, h, j)))
Done

Again, the elements in one Source are sent to two Sinks.

Upvotes: 2

Related Questions