David Portabella
David Portabella

Reputation: 12720

scala, transform a callback pattern to a functional style internal iterator

Suppose this API is given and we cannot change it:

object ProviderAPI {

  trait Receiver[T] {
    def receive(entry: T)
    def close()
  }

  def run(r: Receiver[Int]) {
    new Thread() {
      override def run() {
        (0 to 9).foreach { i =>
          r.receive(i)
          Thread.sleep(100)
        }
        r.close()
      }
    }.start()
  }
}

In this example, ProviderAPI.run takes a Receiver, calls receive(i) 10 times and then closes. Typically, ProviderAPI.run would call receive(i) based on a collection which could be infinite.

This API is intended to be used in imperative style, like an external iterator. If our application needs to filter, map and print this input, we need to implement a Receiver which mixes all these operations:

object Main extends App {
  class MyReceiver extends ProviderAPI.Receiver[Int] {
    def receive(entry: Int) {
      if (entry % 2 == 0) {
        println("Entry#" + entry)
      }
    }
    def close() {}
  }

  ProviderAPI.run(new MyReceiver())
}

Now, the question is how to use the ProviderAPI in functional style, internal iterator (without changing the implementation of ProviderAPI, which is given to us). Note that ProviderAPI could also call receive(i) infinite times, so it is not an option to collect everything in a list (also, we should handle each result one by one, instead of collecting all the input first, and processing it afterwards).

I am asking how to implement such a ReceiverToIterator, so that we can use the ProviderAPI in functional style:

object Main extends App {
  val iterator = new ReceiverToIterator[Int]  // how to implement this?
  ProviderAPI.run(iterator)
  iterator
    .view
    .filter(_ % 2 == 0)
    .map("Entry#" + _)
    .foreach(println)
}

Update

Here are four solutions:

Upvotes: 4

Views: 1308

Answers (5)

David Portabella
David Portabella

Reputation: 12720

ReceiverToTraversable

This stackoverflow question came when I wanted to list and process a svn repository using the svnkit.com API as follows:

SvnList svnList = new SvnOperationFactory().createList();
svnList.setReceiver(new ISvnObjectReceiver<SVNDirEntry>() {
  public void receive(SvnTarget target, SVNDirEntry dirEntry) throws SVNException {
    // do something with dirEntry
  }
});
svnList.run();

the API used a callback function, and I wanted to use a functional style instead, as follows:

svnList.
  .filter(e => "pom.xml".compareToIgnoreCase(e.getName()) == 0)
  .map(_.getURL)
  .map(getMavenArtifact)
  .foreach(insertArtifact)

I thought of having a class ReceiverToIterator[T] extends ProviderAPI.Receiver[T] with Iterator[T], but this required the svnkit api to run in another thread. That's why I asked how to solve this problem with a ProviderAPI.run method that run in a new thread. But that was not very wise: if I had explained the real case, someone might have found a better solution before.

Solution

If we tackle the real problem (so, no need of using a thread for the svnkit), a simpler solution is to implement a scala.collection.Traversable instead of a scala.collection.Iterator. While Iterator requires a next and hasNext def, Traversable requires a foreach def, which is very similar to the svnkit callback!

Note that by using view, we make the transformers lazy, so elements are passed one by one through all the chain to foreach(println). this allows to process an infinite collection.

object ProviderAPI {
  trait Receiver[T] {
    def receive(entry: T)
    def close()
  }

  // Later I found out that I don't need a thread
  def run(r: Receiver[Int]) {
    (0 to 9).foreach { i => r.receive(i); Thread.sleep(100) }
  }
}

object Main extends App {
  new ReceiverToTraversable[Int](r => ProviderAPI.run(r))
    .view
    .filter(_ % 2 == 0)
    .map("Entry#" + _)
    .foreach(println)
}

class ReceiverToTraversable[T](val runProducer: (ProviderAPI.Receiver[T] => Unit)) extends Traversable[T] {
  override def foreach[U](f: (T) => U) = {
    object MyReceiver extends ProviderAPI.Receiver[T] {
      def receive(entry: T) = f(entry)
      def close() = {}
    }
    runProducer(MyReceiver)
  }
}

Upvotes: 0

David Portabella
David Portabella

Reputation: 12720

PublishSubjectSolution

A very simple solution using PublishSubject from Netflix RxJava-Scala API:

// libraryDependencies += "com.netflix.rxjava" % "rxjava-scala" % "0.20.7"

import rx.lang.scala.subjects.PublishSubject

class MyReceiver[T] extends ProviderAPI.Receiver[T] {
  val channel = PublishSubject[T]()
  def receive(entry: T) { channel.onNext(entry) }
  def close() { channel.onCompleted() }
}

object Main extends App {
  val myReceiver = new MyReceiver[Int]()
  ProviderAPI.run(myReceiver)
  myReceiver.channel.filter(_ % 2 == 0).map("Entry#" + _).subscribe{n => println(n)}
}

Upvotes: 0

David Portabella
David Portabella

Reputation: 12720

QueueIteratorSolution

The second workaround solution that I proposed attached to the question. I moved it here as an answer.

Solution using the BlockingQueue[Option[T]] based on the suggestion of nadavwr. It allows the producer to continue producing up to queueCapacity before being blocked by the consumer. I implement a QueueToIterator that uses a ArrayBlockingQueue with a given capacity. BlockingQueue has a take() method, but not a peek or hasNext, so I need an OptionNextToIterator as follows:

trait OptionNextToIterator[T] extends Iterator[T] {
  def getOptionNext: Option[T]   // abstract
  def hasNext = { ... }
  def next = { ... }
}

Note: I am using the synchronized block inside OptionNextToIterator, and I am not sure it is totally correct

Solution:

import java.util.concurrent.ArrayBlockingQueue

object Main extends App {
  val receiverToIterator = new ReceiverToIterator[Int](queueCapacity = 3)
  ProviderAPI.run(receiverToIterator)

  Thread.sleep(3000)  // test that ProviderAPI.run can produce 3 items ahead before being blocked by the consumer
  receiverToIterator.filter(_ % 2 == 0).map("Entry#" + _).foreach(println)
}

class ReceiverToIterator[T](val queueCapacity: Int = 1) extends ProviderAPI.Receiver[T] with QueueToIterator[T] {
  def receive(entry: T) { queuePut(entry) }
  def close() { queueClose() }
}

trait QueueToIterator[T] extends OptionNextToIterator[T] {
  val queueCapacity: Int
  val queue = new ArrayBlockingQueue[Option[T]](queueCapacity)
  var queueClosed = false

  def queuePut(entry: T) {
    if (queueClosed) { throw new IllegalStateException("The queue has already been closed."); }
    queue.put(Some(entry))
  }

  def queueClose() {
    queueClosed = true
    queue.put(None)
  }

  def getOptionNext = queue.take
}

trait OptionNextToIterator[T] extends Iterator[T] {
  def getOptionNext: Option[T]

  var answerReady: Boolean = false
  var eof: Boolean = false
  var element: T = _

  def hasNext = {
    prepareNextAnswerIfNecessary()
    !eof
  }

  def next = {
    prepareNextAnswerIfNecessary()
    if (eof) { throw new NoSuchElementException }
    val retVal = element
    answerReady = false
    retVal
  }

  def prepareNextAnswerIfNecessary() {
    if (answerReady) {
      return
    }
    synchronized {
      getOptionNext match {
        case None => eof = true
        case Some(e) => element = e
      }
      answerReady = true
    }
  }
}

Upvotes: 0

David Portabella
David Portabella

Reputation: 12720

IteratorWithSemaphorSolution

The first workaround solution that I proposed attached to the question. I moved it here as an answer.

import java.util.concurrent.Semaphore

object Main extends App {
  val iterator = new ReceiverToIterator[Int]
  ProviderAPI.run(iterator)
  iterator
    .filter(_ % 2 == 0)
    .map("Entry#" + _)
    .foreach(println)
}

class ReceiverToIterator[T] extends ProviderAPI.Receiver[T] with Iterator[T] {
  var lastEntry: T = _
  var waitingToReceive = new Semaphore(1)
  var waitingToBeConsumed = new Semaphore(1)
  var eof = false

  waitingToReceive.acquire()

  def receive(entry: T) {
    println("ReceiverToIterator.receive(" + entry + "). START.")
    waitingToBeConsumed.acquire()
    lastEntry = entry
    waitingToReceive.release()
    println("ReceiverToIterator.receive(" + entry + "). END.")
  }

  def close() {
    println("ReceiverToIterator.close().")
    eof = true
    waitingToReceive.release()
  }

  def hasNext = {
    println("ReceiverToIterator.hasNext().START.")
    waitingToReceive.acquire()
    waitingToReceive.release()
    println("ReceiverToIterator.hasNext().END.")
    !eof
  }

  def next = {
    println("ReceiverToIterator.next().START.")
    waitingToReceive.acquire()
    if (eof) { throw new NoSuchElementException }
    val entryToReturn = lastEntry
    waitingToBeConsumed.release()
    println("ReceiverToIterator.next().END.")
    entryToReturn
  }
}

Upvotes: 0

nadavwr
nadavwr

Reputation: 1830

Updated: BlockingQueue of 1 entry

What you've implemented here is essentially Java's BlockingQueue, with a queue size of 1.

Main characteristic: uber-blocking. A slow consumer will kill your producer's performance.

Update: @gzm0 mentioned that BlockingQueue doesn't cover EOF. You'll have to use BlockingQueue[Option[T]] for that.

Update: Here's a code fragment. It can be made to fit with your Receiver.
Some of it inspired by Iterator.buffered. Note that peek is a misleading name, as it may block -- and so will hasNext.

// fairness enabled -- you probably want to preserve order...
// alternatively, disable fairness and increase buffer to be 'big enough'
private val queue = new java.util.concurrent.ArrayBlockingQueue[Option[T]](1, true)

// the following block provides you with a potentially blocking peek operation
// it should `queue.take` when the previous peeked head has been invalidated
// specifically, it will `queue.take` and block when the queue is empty
private var head: Option[T] = _
private var headDefined: Boolean = false
private def invalidateHead() { headDefined = false }
private def peek: Option[T] = {
  if (!headDefined) {
    head = queue.take()
    headDefined = true
  }
  head
}

def iterator = new Iterator[T] {

  // potentially blocking; only false upon taking `None`
  def hasNext = peek.isDefined

  // peeks and invalidates head; throws NoSuchElementException as appropriate
  def next: T = {
    val opt = peek; invalidateHead()
    if (opt.isEmpty) throw new NoSuchElementException
    else opt.get
  }
}

Alternative: Iteratees

Iterator-based solutions will generally involve more blocking. Conceptually, you could use continuations on the thread doing the iteration to avoid blocking the thread, but continuations mess with Scala's for-comprehensions, so no joy down that road.

Alternatively, you could consider an iteratee-based solution. Iteratees are different than iterators in that the consumer isn't responsible for advancing the iteration -- the producer is. With iteratees, the consumer basically folds over the entries pushed by the producer over time. Folding each next entry as it becomes available can take place in a thread pool, since the thread is relinquished after each fold completes.

You won't get nice for-syntax for iteration, and the learning curve is a little challenging, but if you feel confident using a foldLeft you'll end up with a non-blocking solution that does look reasonable on the eye.

To read more about iteratees, I suggest taking a peek at PlayFramework 2.X's iteratee reference. The documentation describes their stand-alone iteratee library, which is 100% usable outside the context of Play. Scalaz 7 also has a comprehensive iteratee library.

Upvotes: 1

Related Questions