Dmitry Pavlov
Dmitry Pavlov

Reputation: 31

Akka http-client can't consume all stream of data from server

Im trying to write a simple akka stream rest endpoint and client for consuming this stream. But then i try to run server and client, client is able to consume only part of stream. I can't see any exception during execution.

Here are my server and client:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.common.{EntityStreamingSupport, JsonEntityStreamingSupport}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.stream.{ActorAttributes, ActorMaterializer, Attributes, Supervision}
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import spray.json.DefaultJsonProtocol

import scala.io.StdIn
import scala.util.Random

object WebServer {

  object Model {
    case class Person(id: Int = Random.nextInt(), fName: String = Random.nextString(10), sName: String = Random.nextString(10))
  }

  object JsonProtocol extends SprayJsonSupport with DefaultJsonProtocol {
    implicit val personFormat = jsonFormat(Model.Person.apply, "id", "firstName", "secondaryName")
  }

  def main(args: Array[String]) {

    implicit val system = ActorSystem("my-system")
    implicit val materializer = ActorMaterializer()

    implicit val executionContext = system.dispatcher

    val start = ByteString.empty
    val sep = ByteString("\n")
    val end = ByteString.empty

    import JsonProtocol._
    implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
      .withFramingRenderer(Flow[ByteString].intersperse(start, sep, end))
      .withParallelMarshalling(parallelism = 8, unordered = false)

    val decider: Supervision.Decider = {
      case ex: Throwable => {
        println("Exception occurs")
        ex.printStackTrace()
        Supervision.Resume
      }
    }

    val persons: Source[Model.Person, NotUsed] = Source.fromIterator(
      () => (0 to 1000000).map(id => Model.Person(id = id)).iterator
    )
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .map(p => { println(p); p })


    val route =
      path("persons") {
        get {
          complete(persons)
        }
      }

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine()

    bindingFuture
      .flatMap(_.unbind())
      .onComplete(_ => {
        println("Stopping http server ...")
        system.terminate()
      })
  }
}

and client:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, Uri}
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision}

import scala.util.{Failure, Success}

object WebClient {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher


    val request = HttpRequest(uri = Uri("http://localhost:8080/persons"))

    val response = Http().singleRequest(request)

    val attributes = ActorAttributes.withSupervisionStrategy {
      case ex: Throwable => {
        println("Exception occurs")
        ex.printStackTrace
        Supervision.Resume
      }
    }
    response.map(r => {
      r.entity.dataBytes.withAttributes(attributes)
    }).onComplete {
      case Success(db) => db.map(bs => bs.utf8String).runForeach(println)
      case Failure(ex) => ex.printStackTrace()
    }
  }
}

it works for 100, 1000, 10 000 persons but does not work for > 100 000'. It looks like there is some limit for stream but i can't find it

Last record has been printed by server on my local machine is (with number 79101): Person(79101,ⰷ瑐劲죗醂竜泲늎制䠸,䮳硝沢并⎗ᝨᫌꊭᐽ酡)

Last record on client is(with number 79048):

{"id":79048,"firstName":"췁頔䚐龫暀࡙頨捜昗㢵","secondaryName":"⏉ݾ袈庩컆◁ꄹ葪䑥Ϻ"}

Maybe somebody know why it happens?

Upvotes: 2

Views: 206

Answers (1)

Dmitry Pavlov
Dmitry Pavlov

Reputation: 31

I found a solution. I have to explicitly add r.entity.withoutSizeLimit() on client and after that all works as expected

Upvotes: 1

Related Questions