oblivion
oblivion

Reputation: 6548

How to create a Source from Future[Iterator]?

I have a Future[Iterator]. I want to feed this iterator to my Stream. Here, I still want to construct a Source from the Iterator like we do using Source.fromIterator.

But, I can't use Source.fromIterator here due to Future.

Maybe I could use Source.fromFuture but I when I try to use it, it doesn't seem to actually create a Source from the Iterator in my case. From the docs:

/**
   * Starts a new `Source` from the given `Future`. The stream will consist of
   * one element when the `Future` is completed with a successful value, which
   * may happen before or after materializing the `Flow`.
   * The stream terminates with a failure if the `Future` is completed with a failure.
   */
  def fromFuture[T](future: Future[T]): Source[T, NotUsed] =
    fromGraph(new FutureSource(future))

Upvotes: 3

Views: 2063

Answers (2)

Chunjef's answer best matches the requirements of the question.

I just wanted to point out that typically this kind of functionality is done within another Future via map or flatMap:

type Data = ???

val iterFuture : Future[Iterator[Data]] = ???

val dataSeq : Future[Seq[Data]] = iterFuture flatMap { iter =>
  Source
    .fromIterator(() => iter)
    .to(Sink.seq[Data])
    .run()
}

There is also the Future.traverse function if you don't want to use streams at all:

type OutputData = ???

val someCalculation : Data => Future[OutputData] = ???

val outputIterFuture : Future[Iterator[OutputData]] = 
  iterFuture flatMap { iter => Future.traverse(iter)(someCalculation) }

Upvotes: 1

Jeffrey Chung
Jeffrey Chung

Reputation: 19517

You could use a combination of Source.fromFuture, flatMapConcat, and Source.fromIterator. For example:

val futIter = Future(Iterator(1, 2, 3))

val source: Source[Int, _] =
  Source.fromFuture(futIter).flatMapConcat(iter => Source.fromIterator(() => iter))

Upvotes: 7

Related Questions