joesan
joesan

Reputation: 15385

Akka Streams mapConcat Operator on Source Streams

I'm reading through the documentation for Akka streams and I came across the mapConcat operator which is like the flatMap (at least on the conceptual level).

Here is a simple example:

scala> val src = Source.fromFuture(Future.successful(1 to 10))
src: akka.stream.scaladsl.Source[scala.collection.immutable.Range.Inclusive,akka.NotUsed] = Source(SourceShape(FutureSource.out(51943878)))

I was expecting that type of the Source is rather:

akka.stream.scaladsl.Source[Future[scala.collection.immutable.Range.Inclusive],akka.NotUsed]

Why is that not the case?

My understanding of the types for each line is as shown below:

Source
  .fromFuture(Future.successful(1 to 10)) // Source[Future[Int]]
  .mapConcat(identity) // Source[Int]
  .runForeach(println)

But the Source type in the example above is not what I thought it was!

Upvotes: 4

Views: 2875

Answers (1)

Sebastian
Sebastian

Reputation: 17433

The signature of Source.fromFuture is:

def fromFuture[O](future: Future[O]): Source[O, NotUsed]

In your example O is of type scala.collection.immutable.Range.Inclusive and therefore the return type of Source.fromFuture is:

Source[scala.collection.immutable.Range.Inclusive, NotUsed]

Scala docs

Here is an example demonstrating the difference between map and mapConcat:

def f: Future[List[Int]] = Future.successful((1 to 5).toList)

def g(l: List[Int]): List[String] = l.map(_.toString * 2)

Source
  .fromFuture(f)
  .mapConcat(g) // emits 5 elements of type Int
  .runForeach(println)

Source
  .fromFuture(f)
  .map(g) // emits one element of type List[Int]
  .runForeach(println)

Upvotes: 6

Related Questions