Walrus the Cat
Walrus the Cat

Reputation: 2404

asynchronously prepend to stream

I've got a function that returns an performs operations on an infinite stream, originating from a socket.

def f1(s:Stream[T]):Stream[T] = s map {...} filter {...}

I've got another method that returns a finite sequence that I'd like to join to this stream.

def f2():Stream[T] = ...

Here's what I'm looking for:

val a = f2#:::f1(s)

The problem is, f2 will take some time to compute, and f1 needs to be outputting values as soon as possible. I'd like to wrap f2 in a Runnable so I can compute its value without blocking the rest of the program.

I'd like a to behave as follows: if f2 is done being computed, prepend it to f1 and keep outputting values of that stream. Else, keep outputting values of f1. How should I proceed?

Upvotes: 1

Views: 111

Answers (1)

Ben Reich
Ben Reich

Reputation: 16324

You can build a custom class that extends Stream, and define all the necessary methods for your desired semantics. Going with this approach will give you the most flexibility when it comes to defining custom semantics. You can even then enrich Stream so it has a withAsynPrefix method. Below is a rough version for Iterator, which is a little simpler:

import scala.concurrent._
import scala.concurrent.duration._
import scala.util._
import scala.concurrent.ExecutionContext.Implicits.global

case class CombinedIterator[T](val iterator: Iterator[T], val prefix: Future[Iterator[T]]) extends Iterator[T] {
    def hasNext: Boolean = iterator.hasNext || Await.result(prefix, Duration.Inf).hasNext
    def next: T = prefix.value.collect {
        case Success(pfx) if pfx.hasNext => pfx.next
    }.getOrElse(iterator.next)
}

implicit class AsyncPrefixable[T](val iterator: Iterator[T]) extends AnyVal {
    def withAsyncPrefix(prefix: Future[Iterator[T]]): Iterator[T] = CombinedIterator(iterator, prefix)
}

So then you can do:

val iterator = Iterator.from(0).take(10000)
val prefix = Future { Thread.sleep(10); Iterator(-1) }
val iteratorWithPrefix = iterator.withAsyncPrefix(prefix)
iteratorWithPrefix.toList.indexOf(-1) //Run a few times, location can vary greatly within list

Upvotes: 1

Related Questions