coder25
coder25

Reputation: 2393

akka stream materialized value using different options

I am new to akka stream and want to understand how materialization in streams works

//Print sum of elements from 1 to 10
val newSource = Source(1 to 10)
val flow      = Flow[Int].fold(0)((a, b) => a + b)
val sink      = Sink.foreach(println)
val sumFuture = newSource.via(flow).toMat(sink)(Keep.left).run()

It prints the value 55 using Keep.left and Keep.right. How does the two differ?

I want to explore to see Keep.left and Keep.right give different values as well how we can use Keep.both

Upvotes: 2

Views: 872

Answers (2)

Levi Ramsey
Levi Ramsey

Reputation: 20541

It's important to remember that a stream stage can have

  • a materialized value, which is created when the stream is materialized, before any element has passed through that materialization. Accordingly, it cannot depend on the values which pass through/into/out of the stream.

  • zero or more output values which are passed to the next stage of the stream as the stream runs.

Every stage has a materialized value. Every stage that isn't a sink potentially has output values. For sources, in general, the materialized value provides some means to affect the stream's behavior (e.g. the materialized value of Source.actorRef is an ActorRef which allows you to push elements to the stream by sending a message to that ActorRef, or the various Kafka consumer sources in Alpakka Kafka allow you to stop consuming from Kafka without stopping the stream until the stream is drained).

For sinks, in general, the only way to get a value out of the sink is through the materialized value (since there's no output). Since the materialized value has to be created before any data has flowed through the stream, this is why most sinks materialize to a Future (a placeholder for data which isn't yet available) and generally won't complete the value until the stream completes (since Futures are write-at-most-once).

Every stage has a materialized value, but not every stage has a meaningful materialized value: for those, the special NotUsed value (a singleton) encodes "not meaningful". Most of the flow stages are in this category: they exist solely to transform input to output.

Upvotes: 0

Ivan Stanislavciuc
Ivan Stanislavciuc

Reputation: 7275

Materialised value can be produced by both sink and source. One creates a runnable graph by combining a source to a sink. Keep defines which materialised value to keep when combining

  • Keep.right picks materialised value of the sink
  • Keep.left picks materialised value of the source
  • Keep.both picks both in the form of a tuple
  • Keep.none ignores both and picks NotUsed, ie a marker to indicate that there is no materialised value.

By default, Keep.left is used in operations via, to etc.

Following examples highlight this

Given a Source[Int, String] and a Sink[Int, Future[Int]]

val source: Source[Int, String] = Source(List(1, 2, 3)).mapMaterializedValue(_ => "Source Mat Value")
val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)

We can combine a source and sink to create a runnable graph with different materialised values.

val left: String = source.to(sink).run() //same as toMat(...)(Keep.left)
val right: Future[Int] = source.toMat(sink)(Keep.right).run()
val both: (String, Future[Int]) = source.toMat(sink)(Keep.both).run()

Now, if we run it and print every materialised value it produces following

left=Source Mat Value
right=Future(Success(6))
both=(Source Mat Value,Future(Success(6)))

Please don't mix up materialised value with processing of elements of the stream.

Consider following fold stages

val flowFold: Flow[Int, Int, NotUsed] = Flow[Int].fold(0)(_ + _)
val sinkFold: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)

flowFold applies fold function to every element in the stream and pushes one single value representing the result of fold to downstream. This element can be further processed if needed.

Whereas, sinkFold is the final stage in a graph and it cannot push elements further downstream. It uses materialised value Future[Int] to return the fold result when graph has processed all elements and completed.

if the value of Flow.fold is 55 should this be the materialised value of the flow instead of NotUsed.

No, value 55 is not a materialised value. It's pushed as an element to downstream sink.

You can "catch" element 55 in a materialised value with the help of Sink.head

val flow: Flow[Int, Int, NotUsed] = Flow[Int].fold(0)(_ + _)
val alternativeFoldSink: Sink[Int, Future[Int]] = flow.toMat(Sink.head)(Keep.right)

Every stage can produce materialised value then (why) can't Flow.fold generate materialised value.

Yes, every stage may produce a materialised value. But Flow.fold is designed not to do so. Most of the Flow definitions do not provide materialised values. If you want to use materialised value and fold, I'd suggest to use Sink.fold

Upvotes: 5

Related Questions