Reputation: 2404
I've got a function that returns an performs operations on an infinite stream, originating from a socket.
def f1(s:Stream[T]):Stream[T] = s map {...} filter {...}
I've got another method that returns a finite sequence that I'd like to join to this stream.
def f2():Stream[T] = ...
Here's what I'm looking for:
val a = f2#:::f1(s)
The problem is, f2
will take some time to compute, and f1
needs to be outputting values as soon as possible. I'd like to wrap f2
in a Runnable
so I can compute its value without blocking the rest of the program.
I'd like a
to behave as follows: if f2
is done being computed, prepend it to f1
and keep outputting values of that stream. Else, keep outputting values of f1
. How should I proceed?
Upvotes: 1
Views: 111
Reputation: 16324
You can build a custom class that extends Stream
, and define all the necessary methods for your desired semantics. Going with this approach will give you the most flexibility when it comes to defining custom semantics. You can even then enrich Stream
so it has a withAsynPrefix
method. Below is a rough version for Iterator
, which is a little simpler:
import scala.concurrent._
import scala.concurrent.duration._
import scala.util._
import scala.concurrent.ExecutionContext.Implicits.global
case class CombinedIterator[T](val iterator: Iterator[T], val prefix: Future[Iterator[T]]) extends Iterator[T] {
def hasNext: Boolean = iterator.hasNext || Await.result(prefix, Duration.Inf).hasNext
def next: T = prefix.value.collect {
case Success(pfx) if pfx.hasNext => pfx.next
}.getOrElse(iterator.next)
}
implicit class AsyncPrefixable[T](val iterator: Iterator[T]) extends AnyVal {
def withAsyncPrefix(prefix: Future[Iterator[T]]): Iterator[T] = CombinedIterator(iterator, prefix)
}
So then you can do:
val iterator = Iterator.from(0).take(10000)
val prefix = Future { Thread.sleep(10); Iterator(-1) }
val iteratorWithPrefix = iterator.withAsyncPrefix(prefix)
iteratorWithPrefix.toList.indexOf(-1) //Run a few times, location can vary greatly within list
Upvotes: 1