Reputation: 6548
I have a Future[Iterator]
. I want to feed this iterator to my Stream. Here, I still want to construct a Source from the Iterator like we do using Source.fromIterator
.
But, I can't use Source.fromIterator
here due to Future.
Maybe I could use Source.fromFuture
but I when I try to use it, it doesn't seem to actually create a Source from the Iterator in my case. From the docs:
/**
* Starts a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with a failure if the `Future` is completed with a failure.
*/
def fromFuture[T](future: Future[T]): Source[T, NotUsed] =
fromGraph(new FutureSource(future))
Upvotes: 3
Views: 2063
Reputation: 17933
Chunjef's answer best matches the requirements of the question.
I just wanted to point out that typically this kind of functionality is done within another Future
via map
or flatMap
:
type Data = ???
val iterFuture : Future[Iterator[Data]] = ???
val dataSeq : Future[Seq[Data]] = iterFuture flatMap { iter =>
Source
.fromIterator(() => iter)
.to(Sink.seq[Data])
.run()
}
There is also the Future.traverse
function if you don't want to use streams at all:
type OutputData = ???
val someCalculation : Data => Future[OutputData] = ???
val outputIterFuture : Future[Iterator[OutputData]] =
iterFuture flatMap { iter => Future.traverse(iter)(someCalculation) }
Upvotes: 1
Reputation: 19517
You could use a combination of Source.fromFuture
, flatMapConcat
, and Source.fromIterator
. For example:
val futIter = Future(Iterator(1, 2, 3))
val source: Source[Int, _] =
Source.fromFuture(futIter).flatMapConcat(iter => Source.fromIterator(() => iter))
Upvotes: 7