Reputation: 515
I have to implement an Iterator interface (as defined by a Java API), with hasNext() and next() methods, that should return result elements which originate from an asynchronously processed HTTP response (processed with Akka actors).
Following requirements have to be satisfied:
I have not looked into Java 8 streams or Akka streams yet. But since I basically have to iterate over a queue (a finite stream) I doubt that there are any suitable solution yet.
Currently, my Scala implementation stub uses java.util.concurrent.BlockingQueue and looks like this:
class ResultStreamIterator extends Iterator[Result] {
val resultQueue = new ArrayBlockingQueue[Option[Result]](100)
def hasNext(): Boolean = ??? // return true if not done yet
def next(): Result = ??? // take() next element if not done yet
case class Result(value: Any) // sent by result producing actor
case object Done // sent by result producing actor when finished
class ResultCollector extends Actor {
def receive = {
case Result(value) => resultQueue.put(Some(value))
case Done => resultQueue.put(None)
}
}
}
I use an Option[Result] to indicate the end of the result stream with None. I have experimented with peeking at the next element and using a 'done' flag but I hope that there is an easier solution.
Bonus questions:
Upvotes: 6
Views: 1777
Reputation: 515
I followed the suggestions of jiro and did some adaptations as needed. In general, I like the approach of having getNext()
and next()
implemented as ask
messages sent to the actor. This ensures that there is only one thread at any time which modifies the queue.
However, I'm not sure about the performance of this implementation as ask
and Await.result
will create two threads for each call of hasNext()
and next()
.
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.actor.{ActorRef, ActorSystem, Props, Stash}
import akka.pattern.ask
import akka.util.Timeout
case object HasNext
case object GetNext
case class Result(value: Any)
case object Done
class ResultCollector extends Actor with Stash {
val queue = scala.collection.mutable.Queue.empty[Result]
def collecting: Actor.Receive = {
case HasNext => if (queue.isEmpty) stash else sender ! true
case GetNext => if (queue.isEmpty) stash else sender ! queue.dequeue
case value: Result => unstashAll; queue += value
case Done => unstashAll; context become serving
}
def serving: Actor.Receive = {
case HasNext => sender ! queue.nonEmpty
case GetNext => sender ! { if (queue.nonEmpty) queue.dequeue else new NoSuchElementException }
}
def receive = collecting
}
class ResultStreamIteration(resultCollector: ActorRef) extends Iterator {
implicit val timeout: Timeout = Timeout(30 seconds)
override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match {
case b: Boolean => b
}
override def next(): Any = Await.result(resultCollector ? GetNext, Duration.Inf) match {
case Result(value: Any) => value
case e: Throwable => throw e
}
}
object Test extends App {
implicit val exec = scala.concurrent.ExecutionContext.global
val system = ActorSystem.create("Test")
val actorRef = system.actorOf(Props[ResultCollector])
Future {
for (i <- 1 to 10000) actorRef ! Result(s"Result $i"); actorRef ! Done
}
val iterator = new ResultStreamIteration(actorRef)
while (iterator.hasNext()) println(iterator.next)
system.shutdown()
}
Upvotes: 0
Reputation: 3863
You could store the next element using a variable and just wait for it at beginning of both methods:
private var nextNext: Option[Result] = null
def hasNext(): Boolean = {
if (nextNext == null) nextNext = resultQueue.take()
return !nextNext.isEmpty
}
def next(): Result = {
if (nextNext == null) nextNext = resultQueue.take()
if (nextNext.isEmpty) throw new NoSuchElementException()
val result = nextNext.get
nextNext = null
return result
}
Upvotes: 0
Reputation: 160
Following code would safisfy requirement. Actor's fields are able to be modified safely in Actor's receiver. So resultQueue should not be in Iterator's field, but be in Actor's field.
// ResultCollector should be initialized.
// Initilize code is like...
// resultCollector ! Initialize(100)
class ResultStreamIterator(resultCollector: ActorRef) extends Iterator[Result] {
implicit val timeout: Timeout = ???
override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match {
case ResponseHasNext(hasNext) => hasNext
}
@scala.annotation.tailrec
final override def next(): Result = Await.result(resultCollector ? RequestResult, Duration.Inf) match {
case ResponseResult(result) => result
case Finished => throw new NoSuchElementException("There is not result.")
case WaitingResult => next()// should be wait for a moment.
}
}
case object RequestResult
case object HasNext
case class ResponseResult(result: Result)
case class ResponseHasNext(hasNext: Boolean)
case object Finished
case object WaitingResult
case class Initialize(expects: Int)
// This code may be more ellegant if using Actor FSM
// Acotr's State is (beforeInitialized)->(collecting)->(allCollected)
class ResultCollector extends Actor with Stash {
val results = scala.collection.mutable.Queue.empty[Result]
var expects = 0
var counts = 0
var isAllCollected = false
def beforeInitialized: Actor.Receive = {
case Initialize(n) =>
expects = n
if (expects != 0) context become collecting
else context become allCollected
unstashAll
case _ => stash()
}
def collecting: Actor.Receive = {
case RequestResult =>
if (results.isEmpty) sender ! WaitingResult
else sender ! ResponseResult(results.dequeue())
case HasNext => ResponseHasNext(true)
case result: Result =>
results += result
counts += 1
isAllCollected = counts >= expects
if (isAllCollected) context become allCollected
}
def allCollected: Actor.Receive = {
case RequestResult =>
if (results.isEmpty) sender ! Finished
else sender ! ResponseResult(results.dequeue())
case HasNext => ResponseHasNext(!results.isEmpty)
}
def receive = beforeInitialized
}
Upvotes: 0