Reputation: 2326
Is it possible to dynamically deserialize an external, of unknown length, ByteString
stream from Akka HTTP into domain objects?
I call an infinitely long HTTP
endpoint that outputs a JSON Array
that keeps growing:
[
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
...
] <- Never sees the daylight
Upvotes: 10
Views: 2294
Reputation: 1312
I guess that JsonFraming.objectScanner(Int.MaxValue)
should be used in this case. As docs state:
Returns a Flow that implements a "brace counting" based framing operator for emitting valid JSON chunks. It scans the incoming data stream for valid JSON objects and returns chunks of ByteStrings containing only those valid chunks. Typical examples of data that one may want to frame using this operator include: Very large arrays
So you can end up with something like this:
val response: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = serviceUrl))
response.onComplete {
case Success(value) =>
value.entity.dataBytes
.via(JsonFraming.objectScanner(Int.MaxValue))
.map(_.utf8String) // In case you have ByteString
.map(decode[MyEntity](_)) // Use any Unmarshaller here
.grouped(20)
.runWith(Sink.ignore) // Do whatever you need here
case Failure(exception) => log.error(exception, "Api call failed")
}
Upvotes: 5
Reputation: 1
I had a very similar problem trying to parse the Twitter Stream (an infinite string) into a domain object. I solved it using Json4s, like this:
case class Tweet(username: String, geolocation: Option[Geo])
case class Geo(latitude: Float, longitude: Float)
object Tweet{
def apply(s: String): Tweet = {
parse(StringInput(s), useBigDecimalForDouble = false, useBigIntForLong = false).extract[Tweet]
}
}
Then I just buffer the stream and mapped it to a Tweet:
val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(inputStream), "UTF-8"))
var line = reader.readLine()
while(line != null){
store(Tweet.apply(line))
line = reader.readLine()
}
Json4s has full support over Option (or custom objects inside the object, like Geo in the example). Therefore, you can put an Option like I did, and if the field doesn't come in the Json, it will be set to None.
Hope it helps!
Upvotes: 0
Reputation: 132
I think that play-iteratees-extras must help you. This library allow to parse Json via Enumerator/Iteratee pattern and, of course, don't waiting for receiving all data.
For example, lest build 'infinite' stream of bytes that represents 'infinite' Json array.
import play.api.libs.iteratee.{Enumeratee, Enumerator, Iteratee}
var i = 0
var isFirstWas = false
val max = 10000
val stream = Enumerator("[".getBytes) andThen Enumerator.generateM {
Future {
i += 1
if (i < max) {
val json = Json.stringify(Json.obj(
"prop" -> Random.nextBoolean(),
"prop2" -> Random.nextBoolean(),
"prop3" -> Random.nextInt(),
"prop4" -> Random.alphanumeric.take(5).mkString("")
))
val string = if (isFirstWas) {
"," + json
} else {
isFirstWas = true
json
}
Some(Codec.utf_8.encode(string))
} else if (i == max) Some("]".getBytes) // <------ this is the last jsArray closing tag
else None
}
}
Ok, this value contains jsArray of 10000 (or more) objects. Lets define case class that will be contain data of each object in our array.
case class Props(prop: Boolean, prop2: Boolean, prop3: Int, prop4: String)
Now write parser, that will be parse each item
import play.extras.iteratees._
import JsonBodyParser._
import JsonIteratees._
import JsonEnumeratees._
val parser = jsArray(jsValues(jsSimpleObject)) ><> Enumeratee.map { json =>
for {
prop <- json.\("prop").asOpt[Boolean]
prop2 <- json.\("prop2").asOpt[Boolean]
prop3 <- json.\("prop3").asOpt[Int]
prop4 <- json.\("prop4").asOpt[String]
} yield Props(prop, prop2, prop3, prop4)
}
Please, see doc for jsArray
, jsValues
and jsSimpleObject
. To build result producer:
val result = stream &> Encoding.decode() ><> parser
Encoding.decode()
from JsonIteratees package will decode bytes as CharString
. result
value has type Enumerator[Option[Item]]
and you can apply some iteratee to this enumerator to start parsing process.
In total, I don't know how you receive bytes (the solution depends heavily on this), but I think that show one of the possible solutions of your problem.
Upvotes: -1