goerlitz
goerlitz

Reputation: 515

Iterate with hasNext() and next() over an asynchronously generated stream of elements

I have to implement an Iterator interface (as defined by a Java API), with hasNext() and next() methods, that should return result elements which originate from an asynchronously processed HTTP response (processed with Akka actors).

Following requirements have to be satisfied:

I have not looked into Java 8 streams or Akka streams yet. But since I basically have to iterate over a queue (a finite stream) I doubt that there are any suitable solution yet.

Currently, my Scala implementation stub uses java.util.concurrent.BlockingQueue and looks like this:

class ResultStreamIterator extends Iterator[Result] {
    val resultQueue = new ArrayBlockingQueue[Option[Result]](100)

    def hasNext(): Boolean = ???  // return true if not done yet
    def next(): Result = ???      // take() next element if not done yet

    case class Result(value: Any) // sent by result producing actor
    case object Done              // sent by result producing actor when finished

    class ResultCollector extends Actor {
        def receive = {
           case Result(value) => resultQueue.put(Some(value))
           case Done          => resultQueue.put(None)
        }
    }
}

I use an Option[Result] to indicate the end of the result stream with None. I have experimented with peeking at the next element and using a 'done' flag but I hope that there is an easier solution.

Bonus questions:

Upvotes: 6

Views: 1777

Answers (3)

goerlitz
goerlitz

Reputation: 515

I followed the suggestions of jiro and did some adaptations as needed. In general, I like the approach of having getNext() and next() implemented as ask messages sent to the actor. This ensures that there is only one thread at any time which modifies the queue.

However, I'm not sure about the performance of this implementation as ask and Await.result will create two threads for each call of hasNext() and next().

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps

import akka.actor.{ActorRef, ActorSystem, Props, Stash}
import akka.pattern.ask
import akka.util.Timeout

case object HasNext
case object GetNext

case class Result(value: Any)
case object Done

class ResultCollector extends Actor with Stash {

  val queue = scala.collection.mutable.Queue.empty[Result]

  def collecting: Actor.Receive = {
    case HasNext       => if (queue.isEmpty) stash else sender ! true
    case GetNext       => if (queue.isEmpty) stash else sender ! queue.dequeue
    case value: Result => unstashAll; queue += value
    case Done          => unstashAll; context become serving
  }

  def serving: Actor.Receive = {
    case HasNext => sender ! queue.nonEmpty
    case GetNext => sender ! { if (queue.nonEmpty) queue.dequeue else new NoSuchElementException }
  }

  def receive = collecting
}

class ResultStreamIteration(resultCollector: ActorRef) extends Iterator {

  implicit val timeout: Timeout = Timeout(30 seconds)

  override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match {
    case b: Boolean => b
  }

  override def next(): Any = Await.result(resultCollector ? GetNext, Duration.Inf) match {
    case Result(value: Any) => value
    case e: Throwable       => throw e
  }
}

object Test extends App {
  implicit val exec = scala.concurrent.ExecutionContext.global
  val system = ActorSystem.create("Test")
  val actorRef = system.actorOf(Props[ResultCollector])
  Future {
    for (i <- 1 to 10000) actorRef ! Result(s"Result $i"); actorRef ! Done
  }
  val iterator = new ResultStreamIteration(actorRef)
  while (iterator.hasNext()) println(iterator.next)
  system.shutdown()
}

Upvotes: 0

Bubletan
Bubletan

Reputation: 3863

You could store the next element using a variable and just wait for it at beginning of both methods:

private var nextNext: Option[Result] = null

def hasNext(): Boolean = {
  if (nextNext == null) nextNext = resultQueue.take()
  return !nextNext.isEmpty
}

def next(): Result = {
  if (nextNext == null) nextNext = resultQueue.take()
  if (nextNext.isEmpty) throw new NoSuchElementException()
  val result = nextNext.get
  nextNext = null
  return result
}

Upvotes: 0

jiro
jiro

Reputation: 160

Following code would safisfy requirement. Actor's fields are able to be modified safely in Actor's receiver. So resultQueue should not be in Iterator's field, but be in Actor's field.

// ResultCollector should be initialized.
// Initilize code is like...
// resultCollector ! Initialize(100)
class ResultStreamIterator(resultCollector: ActorRef) extends Iterator[Result] {

  implicit val timeout: Timeout = ???

  override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match {
    case ResponseHasNext(hasNext) => hasNext
  }

  @scala.annotation.tailrec
  final override def next(): Result = Await.result(resultCollector ? RequestResult, Duration.Inf) match {
    case ResponseResult(result) => result
    case Finished => throw new NoSuchElementException("There is not result.")
    case WaitingResult => next()// should be wait for a moment.
  }

}

case object RequestResult
case object HasNext

case class ResponseResult(result: Result)
case class ResponseHasNext(hasNext: Boolean)
case object Finished
case object WaitingResult

case class Initialize(expects: Int)

// This code may be more ellegant if using Actor FSM
// Acotr's State is (beforeInitialized)->(collecting)->(allCollected)
class ResultCollector extends Actor with Stash {

  val results = scala.collection.mutable.Queue.empty[Result]

  var expects = 0

  var counts = 0

  var isAllCollected = false

  def beforeInitialized: Actor.Receive = {
    case Initialize(n) =>
      expects = n
      if (expects != 0) context become collecting
      else context become allCollected
      unstashAll
    case _ => stash()
  }

  def collecting: Actor.Receive = {
    case RequestResult =>
      if (results.isEmpty) sender ! WaitingResult
      else sender ! ResponseResult(results.dequeue())
    case HasNext => ResponseHasNext(true)
    case result: Result =>
      results += result
      counts += 1
      isAllCollected = counts >= expects
      if (isAllCollected) context become allCollected
  }

  def allCollected: Actor.Receive = {
    case RequestResult =>
      if (results.isEmpty) sender ! Finished
      else sender ! ResponseResult(results.dequeue())
    case HasNext => ResponseHasNext(!results.isEmpty)
  }

  def receive = beforeInitialized
}

Upvotes: 0

Related Questions