Reputation: 356
I have data being pulled from Reactive Mongo that I need to push through a Spray Rest API. I had hoped to do this with Chunked Responses. However I have discovered that the Enumerator that comes back from Reactive Mongo is capable of pushing through Spray faster than the network connection can handle. The result is that the connection is terminated.
I was able to resolve this problem using the Spray Ack feature in an intermediate actor. This along with a Blocking Await allowed me to create backpressure on the Enumerator. However I don't really want the Await. I would like to figure out a way to stream the data through Spray in a non-blocking fashion.
Is this possible? I have few ideas that might work if I can fill in the missing pieces.
1) Create back pressure on the Enumerator in a non-blocking fashion (no idea how to do this. Suggestions?)
2) Break the enumerator into smaller enumerators. Start consuming each enumerator only once the previous one has completed. I can do this using an actor. What I lack here is a way to break the larger enumerator into smaller enumerators.
3) Use something like the "Enumeratee.take" method. Where I would take some number of records from the Enumerator, then when I am ready, take some more. This is really just the same solution as 2) but from a slightly different perspective. This would require the enumerator to maintain state however. Is there a way to use the Enumeratee.take multiple times against the same enumerator without restarting from the beginning each time?
Can anyone offer any alternate suggestions that might work? Or if it is not possible please let me know.
I am using Play Enumerators 2.3.5
Upvotes: 3
Views: 286
Reputation: 356
After a fair amount of experimentation (and help from stackoverflow) I was able to figure out a solution that seems to work. It uses Spray Chunked Responses and builds an iteratee around that.
The relevant code snippets are included here:
ChunkedResponder.scala
package chunkedresponses
import akka.actor.{Actor, ActorRef}
import spray.http.HttpHeaders.RawHeader
import spray.http._
object ChunkedResponder {
case class Chunk(data: HttpData)
case object Shutdown
case object Ack
}
class ChunkedResponder(contentType: ContentType, responder: ActorRef) extends Actor {
import ChunkedResponder._
def receive:Receive = {
case chunk: Chunk =>
responder.forward(ChunkedResponseStart(HttpResponse(entity = HttpEntity(contentType, chunk.data))).withAck(Ack))
context.become(chunking)
case Shutdown =>
responder.forward(HttpResponse(headers = List(RawHeader("Content-Type", contentType.value))).withAck(Ack))
context.stop(self)
}
def chunking:Receive = {
case chunk: Chunk =>
responder.forward(MessageChunk(chunk.data).withAck(Ack))
case Shutdown =>
responder.forward(ChunkedMessageEnd().withAck(Ack))
context.stop(self)
}
}
ChunkIteratee.scala
package chunkedresponses
import akka.actor.ActorRef
import akka.util.Timeout
import akka.pattern.ask
import play.api.libs.iteratee.{Done, Step, Input, Iteratee}
import spray.http.HttpData
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
class ChunkIteratee(chunkedResponder: ActorRef) extends Iteratee[HttpData, Unit] {
import ChunkedResponder._
private implicit val timeout = Timeout(30.seconds)
def fold[B](folder: (Step[HttpData, Unit]) => Future[B])(implicit ec: ExecutionContext): Future[B] = {
def waitForAck(future: Future[Any]):Iteratee[HttpData, Unit] = Iteratee.flatten(future.map(_ => this))
def step(input: Input[HttpData]):Iteratee[HttpData, Unit] = input match {
case Input.El(e) => waitForAck(chunkedResponder ? Chunk(e))
case Input.Empty => waitForAck(Future.successful(Unit))
case Input.EOF =>
chunkedResponder ! Shutdown
Done(Unit, Input.EOF)
}
folder(Step.Cont(step))
}
}
package.scala
import akka.actor.{ActorContext, ActorRefFactory, Props}
import play.api.libs.iteratee.Enumerator
import spray.http.{HttpData, ContentType}
import spray.routing.RequestContext
import scala.concurrent.ExecutionContext
package object chunkedresponses {
implicit class ChunkedRequestContext(requestContext: RequestContext) {
def completeChunked(contentType: ContentType, enumerator: Enumerator[HttpData])
(implicit executionContext: ExecutionContext, actorRefFactory: ActorRefFactory) {
val chunkedResponder = actorRefFactory.actorOf(Props(new ChunkedResponder(contentType, requestContext.responder)))
val iteratee = new ChunkIteratee(chunkedResponder)
enumerator.run(iteratee)
}
}
}
Upvotes: 0
Reputation: 17431
I think the idea is you implement an Iteratee
whose fold
method only calls the supplied callback after receiving the Spray Ack. Something like:
def handleData(input: Input[String]) = new Iteratee[String] {
def fold[B](folder: Step[Error, String] => Future[B]): Future[B] = {
(sprayActor ? input).flatMap {
case success => folder(Cont(handleData))
case error => folder(Error(...))
case done => ...
}
}
}
val initialIteratee = new Iteratee[String] {
def fold[B](folder: Step[Error, String] => Future[B]) = folder(Cont(handleData))
}
enumerator.run(initialIteratee)
This should be nonblocking but ensures the next chunk is only sent after the previous chunk has succeeded.
Upvotes: 1