JBY
JBY

Reputation: 238

How to convert an Akka Source to a scala.stream

I already have a Source[T], but I need to pass it to a function that requires a Stream[T].

I could .run the source and materialize everything to a list and then do a .toStream on the result but that removes the lazy/stream aspect that I want to keep.

Is this the only way to accomplish this or am I missing something?

EDIT:

After reading Vladimir's comment, I believe I'm approaching my issue in the wrong way. Here's a simple example of what I have and what I want to create:

// Given a start index, returns a list from startIndex to startIndex+10.  Stops at 50.
def getMoreData(startIndex: Int)(implicit ec: ExecutionContext): Future[List[Int]] = {
  println(s"f running with $startIndex")
  val result: List[Int] = (startIndex until Math.min(startIndex + 10, 50)).toList
  Future.successful(result)
}

So getMoreData just emulates a service which returns data by the page to me. My first goal it to create the following function:

def getStream(startIndex: Int)(implicit ec: ExecutionContext): Stream[Future[List[Int]]]

where the next Future[List[Int]] element in the stream depends on the previous one, taking the last index read from the previous Future's value in the stream. Eg with a startIndex of 0:

getStream(0)(0) would return Future[List[0 until 10]]
getStream(0)(1) would return Future[List[10 until 20]]
... etc

Once I have that function, I then want to create a 2nd function to further map it down to a simple Stream[Int]. Eg:

def getFlattenedStream(stream: Stream[Future[List[Int]]]): Stream[Int]

Streams are beginning to feel like the wrong tool for the job and I should just write a simple loop instead. I liked the idea of streams because the consumer can map/modify/consume the stream as they see fit without the producer needing to know about it.

Upvotes: 1

Views: 796

Answers (1)

Scala Streams are a fine way of accomplishing your task within getStream; here is a basic way to construct what you're looking for:

def getStream(startIndex : Int)
             (implicit ec: ExecutionContext): Stream[Future[List[Int]]] = 
  Stream
    .from(startIndex, 10)
    .map(getMoreData)

Where things get tricky is with your getFlattenedStream function. It is possible to eliminate the Future wrapper around your List[Int] values, but it will require an Await.result function call which is usually a mistake.

More often than not it is best to operate on the Futures and allow asynchronous operations to happen on their own. If you analyze your ultimate requirement/goal it is usually not necessary to wait on a Future.

But, iff you absolutely must drop the Future then here is the code that can accomplish it:

val awaitDuration = 10 Seconds

def getFlattenedStream(stream: Stream[Future[List[Int]]]): Stream[Int] = 
  stream
    .map(f => Await.result(f, awaitDuration))
    .flatten

Upvotes: 0

Related Questions