Reputation: 238
I already have a Source[T]
, but I need to pass it to a function that requires a Stream[T]
.
I could .run
the source and materialize everything to a list and then do a .toStream
on the result but that removes the lazy/stream aspect that I want to keep.
Is this the only way to accomplish this or am I missing something?
EDIT:
After reading Vladimir's comment, I believe I'm approaching my issue in the wrong way. Here's a simple example of what I have and what I want to create:
// Given a start index, returns a list from startIndex to startIndex+10. Stops at 50.
def getMoreData(startIndex: Int)(implicit ec: ExecutionContext): Future[List[Int]] = {
println(s"f running with $startIndex")
val result: List[Int] = (startIndex until Math.min(startIndex + 10, 50)).toList
Future.successful(result)
}
So getMoreData
just emulates a service which returns data by the page to me.
My first goal it to create the following function:
def getStream(startIndex: Int)(implicit ec: ExecutionContext): Stream[Future[List[Int]]]
where the next Future[List[Int]]
element in the stream depends on the previous one, taking the last index read from the previous Future's value in the stream. Eg with a startIndex of 0:
getStream(0)(0) would return Future[List[0 until 10]]
getStream(0)(1) would return Future[List[10 until 20]]
... etc
Once I have that function, I then want to create a 2nd function to further map it down to a simple Stream[Int]
. Eg:
def getFlattenedStream(stream: Stream[Future[List[Int]]]): Stream[Int]
Streams are beginning to feel like the wrong tool for the job and I should just write a simple loop instead. I liked the idea of streams because the consumer can map/modify/consume the stream as they see fit without the producer needing to know about it.
Upvotes: 1
Views: 796
Reputation: 17933
Scala Streams are a fine way of accomplishing your task within getStream
; here is a basic way to construct what you're looking for:
def getStream(startIndex : Int)
(implicit ec: ExecutionContext): Stream[Future[List[Int]]] =
Stream
.from(startIndex, 10)
.map(getMoreData)
Where things get tricky is with your getFlattenedStream
function. It is possible to eliminate the Future
wrapper around your List[Int]
values, but it will require an Await.result
function call which is usually a mistake.
More often than not it is best to operate on the Futures and allow asynchronous operations to happen on their own. If you analyze your ultimate requirement/goal it is usually not necessary to wait on a Future.
But, iff you absolutely must drop the Future then here is the code that can accomplish it:
val awaitDuration = 10 Seconds
def getFlattenedStream(stream: Stream[Future[List[Int]]]): Stream[Int] =
stream
.map(f => Await.result(f, awaitDuration))
.flatten
Upvotes: 0