shabbir hussain
shabbir hussain

Reputation: 341

Scala idiomatic way to timeout an iterator map from within?

I have a producer and consumer architecture where my producer returns an iterator and my consumer is expecting some transformed results. Both of them are out of my control. Now my code is responsible to transform the source stream. One problem is source throughput is unreliable. It will produce records at varying rates. Sometimes too slow.

Is it possible to terminate stream within a map stage? I do have a flag that I can set to kill the process. I cannot place Futures and timeout outside the consumer BTW.

Things I tried:

Hitting kill within a map. This suffers from drawback when no record generated for a while then this condition is never triggered.

source.map(x=> {if(System.currentTimeMillis()>limit) kill(); x})

Another option is to use a while. But, it can't yield from while.

while(source.hasNext()){
    Try(Await.result(Future{source.next()}, limit))
    match {
        case _@Failure(e)=> kill()
        case bla..
    }
}

Any innovative ideas for the same?

Upvotes: 3

Views: 205

Answers (2)

shabbir hussain
shabbir hussain

Reputation: 341

Okay I am going to piggy back on jwvh's answer. To add in details of the iterator. I am using Try to prefetch the result of next so we don't have to time futures twice. Once for hasNext and once for next.

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicit.global

case class TimedIterator[A](src : Iterator[A], timeout: Duration)
  extends Iterator[Try[A]] {
    private val fail = Failure(new TimeoutException("Iterator timed out after %s".format(timeout.toString)))
    private def fetchNext(): Try[A] = Try(Await.result(Future{src.next()}, timeout))

    private val limitTime = System.currentTimeMillis() + timeout.toMillis
    private var _next: Try[A] = fetchNext()

    def hasNext :Boolean = _next.isSuccess
    def next() : Try[A] = {
      val res = if (System.currentTimeMillis() > limitTime) fail else _next
      _next   = if (res.isSuccess) fetchNext() else res
      res
    }
}

Upvotes: 0

jwvh
jwvh

Reputation: 51271

It's a little hard to grasp the situation without more details on the types you're dealing with.

I wonder if you can't just wrap the source Iterator with your own transformer Iterator.

class Transformer[A,B](src :A) extends Iterator[B] {
  private var nextB :B = _
  def hasNext :Boolean = {
    // pull next element from src
    // if successful load nextB and return true else return false
  }

  def next() :B = nextB
}

Then you can simply let toStream create a Stream[B] that will, at some point, have a termination.

sendToConsumer((new Transformer(source)).toStream)

Upvotes: 1

Related Questions