softshipper
softshipper

Reputation: 34099

The meaning of the Keep combination?

I am trying to under the Keep combination in akka streams and created the following example:

import java.nio.file.Paths

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.Future
import scala.util.{Failure, Success}

object FileConsumer extends App {

  implicit val system = ActorSystem("reactive-tweets")
  implicit val materializer = ActorMaterializer()

  val source: Source[Int, NotUsed] = Source(1 to 100)
  val factorials = source.scan(BigInt(1))((acc, next) => acc * next)

  val result: Future[IOResult] =
    factorials.map(_.toString).runWith(lineSink("factorial2.txt"))

  implicit val ec = system.dispatcher
  result.onComplete {
    case Success(v) => println(s"Fileinfo ${ v.count }")
    case Failure(e) => println(e)
  }

  def lineSink(filename: String): Sink[String, Future[IOResult]] =
    Flow[String].map(s => ByteString(s + "\n")).toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)


} 

on the akka streams website it says:

The resulting blueprint is a Sink[String, Future[IOResult]], which means that it accepts strings as its input and when materialized it will create auxiliary information of type Future[IOResult] (when chaining operations on a Source or Flow the type of the auxiliary information—called the “materialized value”—is given by the leftmost starting point; since we want to retain what the FileIO.toPath sink has to offer, we need to say Keep.right).

but what, when I want to keep the ByteString on the left side, I've tried:

  def lineSink2(filename: String): Sink[String, Future[ByteString]] =
Flow[String].map(s => ByteString(s + "\n")).toMat(Sink.foreach(println))(Keep.left)

but it does not compile at all.

I also do not understand:

is given by the leftmost starting point

Is leftmost starting point the Flow?

I think, I do not understand the idea of Keep yet.

Upvotes: 4

Views: 126

Answers (1)

Łukasz Gawron
Łukasz Gawron

Reputation: 917

Definition of Sink.foreach is as follows:

def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]

That means materialised value is Future[Done]

In case of flow you have:

 val value: Flow[String, ByteString, NotUsed] = Flow[String].map(s => ByteString(s + "\n"))

its materialised value is NotUsed

In this case:

Keep.left - NotUsed - Materialised Value of Source or Flow

Keep.right - Future[Done] - Materalised Value of Sink

Keep.both - (NotUsed, Future[Done])

Important fact is materialised value in many cases is NOT value of elements flowing through the stream, its rather

  • diagnostic information
  • stream state
  • other kind of information about stream

Upvotes: 4

Related Questions