Siddharth Shankar
Siddharth Shankar

Reputation: 539

Print Source[ByteString, NotUsed] values to console

How can I print the values of a source in the console.

val someSource = Source.single(ByteString("SomeValue"))

I want to print the String "SomeValue" from this source. I tried:

someSource.to(Sink.foreach(println)) //This one prints RunnableGraph object

someSource.map(each => {
    val pqr = each.decodeString(ByteString.UTF_8)
    print(pqr)
}) // THis one prints res3: soneSource.Repr[Unit]  = Source(SourceShape(Map.out(169373838)))

How do I print the original string which was originally used to create Source of single object.

Upvotes: 0

Views: 392

Answers (1)

sarveshseri
sarveshseri

Reputation: 13985

From what is written in the question, I think you are probably using Scala console or Scala worksheet.

In Scala console or workseet, it prints a representation of the things created in current statement. For example,

scala> val i = 5
val i: Int = 5

scala> val s = "ssfdf"
val s: String = ssfdf

But, what happens when you use something like a println here,

scala> val u = println("dfsd")
dfsd
val u: Unit = ()

It also execute the println and afterwards prints that the value u created by that println is actually an Unit.

And that is where your confusion is probably coming from, because your println in Sink.foreach is not working in this case.

That is because this case is more like following, where you are actually defining a function.

scala> val f1 = (s: String) => println(s)
val f1: String => Unit = $Lambda$1062/0x0000000800689840@1796b2d4

You are not using println here, you are just defining a function (an instance of String => Unit or Function1[String, Unit]) which will use println.

So, console just prints that the value f1 created here is of type String => Unit.

You will require to call this function to actually execute that println,

scala> f1.apply("dsfsd")
dsfsd

Similarly, someSource.to(Sink.foreach(println)) will create a value of type RunnableGraph, hence scala console will print something like val res0: RunnableGraph....

You will now require to run this graph to actually execute it.

But compared to the function example earlier, the execution of graph happens asynchronously on a thread pool, which means that it might not work in some versions of Scala console or workseet (depending on how the thread pool lifecycle is managed). So, if you just do,

scala> val someSource = Source.single(ByteString("SomeValue"))
val someSource: akka.stream.scaladsl.Source[akka.util.ByteString,akka.NotUsed] = Source(SourceShape(single.out(369296388)))

scala> val runnableGraph = someSource.to(Sink.foreach(println))
val runnableGraph: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = RunnableGraph

scala> runnableGraph.run()

If it works then you will see following,

scala> runnableGraph.run()
val res0: akka.NotUsed = NotUsed
ByteString(83, 111, 109, 101, 86, 97, 108, 117, 101)

But chances are that you will just see some errors related to the console failing to complete the graph run due to some reasion.

You will actually need to materialize the Sink which will reasult into a Future[Done] on running the graph. Then you will have to wait on that Future[Done] using Await.

You will have to put all this into a normal Scala file and execute as an Scala application.

import akka.{Done, actor}
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

object TestAkkaStream extends App {

  val actorSystem = ActorSystem(Behaviors.empty, "test-stream-system")

  implicit val classicActorSystem = actorSystem.classicSystem

  val someSource = Source.single(ByteString("SomeValue"))

  val runnableGraph = someSource.toMat(Sink.foreach(println))(Keep.right)

  val graphRunDoneFuture: Future[Done] = runnableGraph.run()

  Await.result(graphRunDoneFuture, Duration.Inf)
}

Upvotes: 2

Related Questions