Reputation: 315
I'm developing a client-server application using Akka Http and Akka Streams. The main idea is that the server must feed the http response with a Source from an Akka streams.
The problem is that the server accumulates some elements before sending the first message to the client. However, I need the server to send element to element as soon as a new element is produced by the source.
Code example:
case class Example(id: Long, txt: String, number: Double)
object MyJsonProtocol extends SprayJsonSupport with DefaultJsonProtocol {
implicit val exampleFormat = jsonFormat3(Test)
}
class BatchIterator(batchSize: Int, numberOfBatches: Int, pause: FiniteDuration) extends Iterator[Array[Test]]{
val range = Range(0, batchSize*numberOfBatches).toIterator
val numberOfBatchesIter = Range(0, numberOfBatches).toIterator
override def hasNext: Boolean = range.hasNext
override def next(): Array[Test] = {
println(s"Sleeping for ${pause.toMillis} ms")
Thread.sleep(pause.toMillis)
println(s"Taking $batchSize elements")
Range(0, batchSize).map{ _ =>
val count = range.next()
Test(count, s"Text$count", count*0.5)
}.toArray
}
}
object Server extends App {
import MyJsonProtocol._
implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
.withFramingRenderer(
Flow[ByteString].intersperse(ByteString(System.lineSeparator))
)
implicit val system = ActorSystem("api")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
def fetchExamples(): Source[Array[Test], NotUsed] = Source.fromIterator(() => new BatchIterator(5, 5, 2 seconds))
val route =
path("example") {
complete(fetchExamples)
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 9090)
println("Server started at localhost:9090")
StdIn.readLine()
bindingFuture.flatMap(_.unbind()).onComplete(_ ⇒ system.terminate())
}
Then, if I execute:
curl --no-buffer localhost:9090/example
I get all the elements at the same time instead of receiving an element every 2 seconds.
Any idea about how I can "force" the server to send every element as it comes out from the source?
Upvotes: 0
Views: 153
Reputation: 315
Finally, I've found the solution. The problem was that the source is synchronous... So the solution is just to call to the function async
complete(fetchExamples.async)
Upvotes: 1