user3139545
user3139545

Reputation: 7394

Decode chunked JSON with AKKA Stream

I have a Source[ByteString, _] from an input file with 3 rows like this (in reality the input is a TCP socket with a continuos stream):

{"a":[2
33]
}

Now the problem is that I want to parse this into a Source[ChangeMessage,_], however the only examples I have found deals with when there is a whole JSON message for every row not when each JSON message can be fragmented over multiple rows.

One example I found is this this library, however it expects } or , as last character, that is one JSON per row. The example below shows this setup.

"My decoder" should "decode chunked json" in {
    implicit val sys = ActorSystem("test")
    implicit val mat = ActorMaterializer()
    val file = Paths.get("chunked_json_stream.json")
    val data = FileIO.fromPath(file)
    .via(CirceStreamSupport.decode[ChangeMessage])
    .runWith(TestSink.probe[ChangeMessage])
    .request(1)
    .expectComplete()
  }

Another alternative would be to use a fold and balance } and only emit when a whole JSON is completed. The problem with this is that the fold operator only emits when the stream completes and since this is a continuous stream I can not use it here.

My question is: What is the fastest way to parse chunked JSON streams in AKKA Stream and are there any available software that already does this? If possible I would like to use circe

Upvotes: 6

Views: 1703

Answers (2)

bszwej
bszwej

Reputation: 420

As documentation of knutwalker/akka-stream-json says:

This flow even supports parsing multiple json documents in whatever fragmentation they may arrive, which is great for consuming stream/sse based APIs.

In your case all you need to do is to just delimit the incoming ByteStrings:

"My decoder" should "decode chunked json" in {
    implicit val sys = ActorSystem("test")
    implicit val mat = ActorMaterializer()
    val file = Paths.get("chunked_json_stream.json")

    val sourceUnderTest =
      FileIO.fromPath(file)
        .via(Framing.delimiter(ByteString("\n"), 8192, allowTruncation = true))
        .via(CirceStreamSupport.decode[ChangeMessage])

    sourceUnderTest
      .runWith(TestSink.probe[ChangeMessage])
      .request(1)
      .expectNext(ChangeMessage(List(233)))
      .expectComplete()
}

That's because when reading from file, ByteString elements contain multiple lines and therefore Circe is not able to parse malformed jsons. When you delimit by new line, each element in the stream is a separate line and therefore Circe is able to parse it using the aformentioned feature.

Upvotes: 3

Vladimir Matveev
Vladimir Matveev

Reputation: 127961

Unfortunately, I'm not aware of any Scala libraries which support stream-based parsing of JSON. It seems to me that some support for this is available in Google Gson, but I'm not entirely sure it can properly handle "broken" input.

What you can do, however, is to collect JSON documents in a streaming fashion, similarly to what Framing.delimiter does. This is very similar to the alternative you have mentioned, but it is not using fold(); if you do go this way, you would probably need to mimic what Framing.delimiter does but instead of looking for a single delimiter, you will need to balance curly braces (and optionally brackets, if top-level arrays are possible), buffering the intermediate data, until the entire document comes through, which you would emit as a single chunk suitable for parsing.

Just as a side note, an appropriate interface for a streaming JSON parser suitable to be used in Akka Streams could look like this:

trait Parser {
  def update(data: Array[Byte])  // or String
  def pull(): Option[Either[Error, JsonEvent]]
}

where pull() returns None if it can't read anymore but there are no actual syntactic errors in the incoming document, and JsonEvent is some standard structure for describing events of streaming parsers (i.e. a sealed trait with subclasses like BeginObject, BeginArray, EndObject, EndArray, String, etc.). If you find such a library or create one, you can use it to parse data coming from an Akka stream of ByteStrings.

Upvotes: 0

Related Questions