Martijn
Martijn

Reputation: 2326

Akka HTTP Streaming JSON Deserialization

Is it possible to dynamically deserialize an external, of unknown length, ByteString stream from Akka HTTP into domain objects?


Context

I call an infinitely long HTTP endpoint that outputs a JSON Array that keeps growing:

[
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    ...
] <- Never sees the daylight

Upvotes: 10

Views: 2294

Answers (3)

Dmitry K
Dmitry K

Reputation: 1312

I guess that JsonFraming.objectScanner(Int.MaxValue) should be used in this case. As docs state:

Returns a Flow that implements a "brace counting" based framing operator for emitting valid JSON chunks. It scans the incoming data stream for valid JSON objects and returns chunks of ByteStrings containing only those valid chunks. Typical examples of data that one may want to frame using this operator include: Very large arrays

So you can end up with something like this:

val response: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = serviceUrl))

response.onComplete {
  case Success(value) =>
    value.entity.dataBytes
      .via(JsonFraming.objectScanner(Int.MaxValue))
      .map(_.utf8String)         // In case you have ByteString
      .map(decode[MyEntity](_))  // Use any Unmarshaller here
      .grouped(20)
      .runWith(Sink.ignore)      // Do whatever you need here 
  case Failure(exception) => log.error(exception, "Api call failed")
}

Upvotes: 5

Bluejay
Bluejay

Reputation: 1

I had a very similar problem trying to parse the Twitter Stream (an infinite string) into a domain object. I solved it using Json4s, like this:

case class Tweet(username: String, geolocation: Option[Geo])
case class Geo(latitude: Float, longitude: Float)
object Tweet{
    def apply(s: String): Tweet = {
        parse(StringInput(s), useBigDecimalForDouble = false, useBigIntForLong = false).extract[Tweet]
    }
}

Then I just buffer the stream and mapped it to a Tweet:

val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(inputStream), "UTF-8"))
var line = reader.readLine()
while(line != null){
    store(Tweet.apply(line))
    line = reader.readLine()
}

Json4s has full support over Option (or custom objects inside the object, like Geo in the example). Therefore, you can put an Option like I did, and if the field doesn't come in the Json, it will be set to None.

Hope it helps!

Upvotes: 0

MrRontgen
MrRontgen

Reputation: 132

I think that play-iteratees-extras must help you. This library allow to parse Json via Enumerator/Iteratee pattern and, of course, don't waiting for receiving all data.

For example, lest build 'infinite' stream of bytes that represents 'infinite' Json array.

import play.api.libs.iteratee.{Enumeratee, Enumerator, Iteratee}

var i = 0
var isFirstWas = false

val max = 10000

val stream = Enumerator("[".getBytes) andThen Enumerator.generateM {
  Future {
    i += 1
    if (i < max) {
      val json = Json.stringify(Json.obj(
        "prop" -> Random.nextBoolean(),
        "prop2" -> Random.nextBoolean(),
        "prop3" -> Random.nextInt(),
        "prop4" -> Random.alphanumeric.take(5).mkString("")
      ))

      val string = if (isFirstWas) {
        "," + json
      } else {
        isFirstWas = true
        json
      }


      Some(Codec.utf_8.encode(string))
    } else if (i == max) Some("]".getBytes) // <------ this is the last jsArray closing tag
    else None

  }
}

Ok, this value contains jsArray of 10000 (or more) objects. Lets define case class that will be contain data of each object in our array.

case class Props(prop: Boolean, prop2: Boolean, prop3: Int, prop4: String)

Now write parser, that will be parse each item

import play.extras.iteratees._    
import JsonBodyParser._
import JsonIteratees._
import JsonEnumeratees._

val parser = jsArray(jsValues(jsSimpleObject)) ><> Enumeratee.map { json =>
  for {
    prop <- json.\("prop").asOpt[Boolean]
    prop2 <- json.\("prop2").asOpt[Boolean]
    prop3 <- json.\("prop3").asOpt[Int]
    prop4 <- json.\("prop4").asOpt[String]
  } yield Props(prop, prop2, prop3, prop4)
}

Please, see doc for jsArray, jsValues and jsSimpleObject. To build result producer:

val result = stream &> Encoding.decode() ><> parser

Encoding.decode() from JsonIteratees package will decode bytes as CharString. result value has type Enumerator[Option[Item]] and you can apply some iteratee to this enumerator to start parsing process.

In total, I don't know how you receive bytes (the solution depends heavily on this), but I think that show one of the possible solutions of your problem.

Upvotes: -1

Related Questions