Reputation: 539
How can I print the values of a source in the console.
val someSource = Source.single(ByteString("SomeValue"))
I want to print the String "SomeValue" from this source. I tried:
someSource.to(Sink.foreach(println)) //This one prints RunnableGraph object
someSource.map(each => {
val pqr = each.decodeString(ByteString.UTF_8)
print(pqr)
}) // THis one prints res3: soneSource.Repr[Unit] = Source(SourceShape(Map.out(169373838)))
How do I print the original string which was originally used to create Source of single object.
Upvotes: 0
Views: 392
Reputation: 13985
From what is written in the question, I think you are probably using Scala console or Scala worksheet.
In Scala console or workseet, it prints a representation of the things created in current statement. For example,
scala> val i = 5
val i: Int = 5
scala> val s = "ssfdf"
val s: String = ssfdf
But, what happens when you use something like a println
here,
scala> val u = println("dfsd")
dfsd
val u: Unit = ()
It also execute the println and afterwards prints that the value u
created by that println is actually an Unit
.
And that is where your confusion is probably coming from, because your println
in Sink.foreach
is not working in this case.
That is because this case is more like following, where you are actually defining a function.
scala> val f1 = (s: String) => println(s)
val f1: String => Unit = $Lambda$1062/0x0000000800689840@1796b2d4
You are not using println
here, you are just defining a function (an instance of String => Unit
or Function1[String, Unit]
) which will use println
.
So, console just prints that the value f1
created here is of type String => Unit
.
You will require to call this function to actually execute that println
,
scala> f1.apply("dsfsd")
dsfsd
Similarly, someSource.to(Sink.foreach(println))
will create a value of type RunnableGraph
, hence scala console will print something like val res0: RunnableGraph...
.
You will now require to run this graph to actually execute it.
But compared to the function example earlier, the execution of graph happens asynchronously on a thread pool, which means that it might not work in some versions of Scala console or workseet (depending on how the thread pool lifecycle is managed). So, if you just do,
scala> val someSource = Source.single(ByteString("SomeValue"))
val someSource: akka.stream.scaladsl.Source[akka.util.ByteString,akka.NotUsed] = Source(SourceShape(single.out(369296388)))
scala> val runnableGraph = someSource.to(Sink.foreach(println))
val runnableGraph: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = RunnableGraph
scala> runnableGraph.run()
If it works then you will see following,
scala> runnableGraph.run()
val res0: akka.NotUsed = NotUsed
ByteString(83, 111, 109, 101, 86, 97, 108, 117, 101)
But chances are that you will just see some errors related to the console failing to complete the graph run due to some reasion.
You will actually need to materialize the Sink
which will reasult into a Future[Done]
on running the graph. Then you will have to wait on that Future[Done]
using Await
.
You will have to put all this into a normal Scala file and execute as an Scala application.
import akka.{Done, actor}
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
object TestAkkaStream extends App {
val actorSystem = ActorSystem(Behaviors.empty, "test-stream-system")
implicit val classicActorSystem = actorSystem.classicSystem
val someSource = Source.single(ByteString("SomeValue"))
val runnableGraph = someSource.toMat(Sink.foreach(println))(Keep.right)
val graphRunDoneFuture: Future[Done] = runnableGraph.run()
Await.result(graphRunDoneFuture, Duration.Inf)
}
Upvotes: 2