Wade
Wade

Reputation: 356

Stream data from a Play Enumerator through Spray using Chunked Responses

I have data being pulled from Reactive Mongo that I need to push through a Spray Rest API. I had hoped to do this with Chunked Responses. However I have discovered that the Enumerator that comes back from Reactive Mongo is capable of pushing through Spray faster than the network connection can handle. The result is that the connection is terminated.

I was able to resolve this problem using the Spray Ack feature in an intermediate actor. This along with a Blocking Await allowed me to create backpressure on the Enumerator. However I don't really want the Await. I would like to figure out a way to stream the data through Spray in a non-blocking fashion.

Is this possible? I have few ideas that might work if I can fill in the missing pieces.

1) Create back pressure on the Enumerator in a non-blocking fashion (no idea how to do this. Suggestions?)

2) Break the enumerator into smaller enumerators. Start consuming each enumerator only once the previous one has completed. I can do this using an actor. What I lack here is a way to break the larger enumerator into smaller enumerators.

3) Use something like the "Enumeratee.take" method. Where I would take some number of records from the Enumerator, then when I am ready, take some more. This is really just the same solution as 2) but from a slightly different perspective. This would require the enumerator to maintain state however. Is there a way to use the Enumeratee.take multiple times against the same enumerator without restarting from the beginning each time?

Can anyone offer any alternate suggestions that might work? Or if it is not possible please let me know.

I am using Play Enumerators 2.3.5

Upvotes: 3

Views: 286

Answers (2)

Wade
Wade

Reputation: 356

After a fair amount of experimentation (and help from stackoverflow) I was able to figure out a solution that seems to work. It uses Spray Chunked Responses and builds an iteratee around that.

The relevant code snippets are included here:

ChunkedResponder.scala

package chunkedresponses

import akka.actor.{Actor, ActorRef}
import spray.http.HttpHeaders.RawHeader
import spray.http._

object ChunkedResponder {
  case class Chunk(data: HttpData)
  case object Shutdown
  case object Ack
}

class ChunkedResponder(contentType: ContentType, responder: ActorRef) extends Actor {
  import ChunkedResponder._
  def receive:Receive = {
    case chunk: Chunk =>
      responder.forward(ChunkedResponseStart(HttpResponse(entity = HttpEntity(contentType, chunk.data))).withAck(Ack))
      context.become(chunking)
    case Shutdown =>
      responder.forward(HttpResponse(headers = List(RawHeader("Content-Type", contentType.value))).withAck(Ack))
      context.stop(self)
  }

  def chunking:Receive = {
    case chunk: Chunk =>
      responder.forward(MessageChunk(chunk.data).withAck(Ack))
    case Shutdown =>
      responder.forward(ChunkedMessageEnd().withAck(Ack))
      context.stop(self)
  }
}

ChunkIteratee.scala

package chunkedresponses

import akka.actor.ActorRef
import akka.util.Timeout
import akka.pattern.ask
import play.api.libs.iteratee.{Done, Step, Input, Iteratee}
import spray.http.HttpData
import scala.concurrent.duration._

import scala.concurrent.{ExecutionContext, Future}

class ChunkIteratee(chunkedResponder: ActorRef) extends Iteratee[HttpData, Unit] {
  import ChunkedResponder._
  private implicit val timeout = Timeout(30.seconds)

  def fold[B](folder: (Step[HttpData, Unit]) => Future[B])(implicit ec: ExecutionContext): Future[B] = {
    def waitForAck(future: Future[Any]):Iteratee[HttpData, Unit] = Iteratee.flatten(future.map(_ => this))

    def step(input: Input[HttpData]):Iteratee[HttpData, Unit] = input match {
      case Input.El(e) => waitForAck(chunkedResponder ? Chunk(e))
      case Input.Empty => waitForAck(Future.successful(Unit))
      case Input.EOF =>
        chunkedResponder ! Shutdown
        Done(Unit, Input.EOF)
    }

    folder(Step.Cont(step))
  }
}

package.scala

import akka.actor.{ActorContext, ActorRefFactory, Props}
import play.api.libs.iteratee.Enumerator
import spray.http.{HttpData, ContentType}
import spray.routing.RequestContext

import scala.concurrent.ExecutionContext

package object chunkedresponses {
  implicit class ChunkedRequestContext(requestContext: RequestContext) {
    def completeChunked(contentType: ContentType, enumerator: Enumerator[HttpData])
                       (implicit executionContext: ExecutionContext, actorRefFactory: ActorRefFactory) {
      val chunkedResponder = actorRefFactory.actorOf(Props(new ChunkedResponder(contentType, requestContext.responder)))
      val iteratee = new ChunkIteratee(chunkedResponder)
      enumerator.run(iteratee)
    }
  }
}

Upvotes: 0

lmm
lmm

Reputation: 17431

I think the idea is you implement an Iteratee whose fold method only calls the supplied callback after receiving the Spray Ack. Something like:

def handleData(input: Input[String]) = new Iteratee[String] {
  def fold[B](folder: Step[Error, String] => Future[B]): Future[B] = {
    (sprayActor ? input).flatMap {
      case success => folder(Cont(handleData))
      case error => folder(Error(...))
      case done => ...
    }
  }
}

val initialIteratee = new Iteratee[String] {
  def fold[B](folder: Step[Error, String] => Future[B]) = folder(Cont(handleData))
}

enumerator.run(initialIteratee)

This should be nonblocking but ensures the next chunk is only sent after the previous chunk has succeeded.

Upvotes: 1

Related Questions