Reputation: 341
I have a producer and consumer architecture where my producer returns an iterator and my consumer is expecting some transformed results. Both of them are out of my control. Now my code is responsible to transform the source stream. One problem is source throughput is unreliable. It will produce records at varying rates. Sometimes too slow.
Is it possible to terminate stream within a map stage? I do have a flag that I can set to kill the process. I cannot place Futures and timeout outside the consumer BTW.
Things I tried:
Hitting kill within a map. This suffers from drawback when no record generated for a while then this condition is never triggered.
source.map(x=> {if(System.currentTimeMillis()>limit) kill(); x})
Another option is to use a while. But, it can't yield from while.
while(source.hasNext()){
Try(Await.result(Future{source.next()}, limit))
match {
case _@Failure(e)=> kill()
case bla..
}
}
Any innovative ideas for the same?
Upvotes: 3
Views: 205
Reputation: 341
Okay I am going to piggy back on jwvh's answer. To add in details of the iterator. I am using Try to prefetch the result of next so we don't have to time futures twice. Once for hasNext and once for next.
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicit.global
case class TimedIterator[A](src : Iterator[A], timeout: Duration)
extends Iterator[Try[A]] {
private val fail = Failure(new TimeoutException("Iterator timed out after %s".format(timeout.toString)))
private def fetchNext(): Try[A] = Try(Await.result(Future{src.next()}, timeout))
private val limitTime = System.currentTimeMillis() + timeout.toMillis
private var _next: Try[A] = fetchNext()
def hasNext :Boolean = _next.isSuccess
def next() : Try[A] = {
val res = if (System.currentTimeMillis() > limitTime) fail else _next
_next = if (res.isSuccess) fetchNext() else res
res
}
}
Upvotes: 0
Reputation: 51271
It's a little hard to grasp the situation without more details on the types you're dealing with.
I wonder if you can't just wrap the source Iterator
with your own transformer Iterator
.
class Transformer[A,B](src :A) extends Iterator[B] {
private var nextB :B = _
def hasNext :Boolean = {
// pull next element from src
// if successful load nextB and return true else return false
}
def next() :B = nextB
}
Then you can simply let toStream
create a Stream[B]
that will, at some point, have a termination.
sendToConsumer((new Transformer(source)).toStream)
Upvotes: 1