lony
lony

Reputation: 7819

How to extract JSON from a binary protobuf?

Considering a Apache Spark 2.2.0 Structured Stream as:

jsonStream.printSchema()
root
 |-- body: binary (nullable = true)

The data inside body is of type Protocol Buffers v2 and a nested JSON. It looks like

syntax = "proto2";

message Data {
  required string data = 1;
}

message List {
  repeated Data entry = 1;
}

How can I extract the data inside Spark to "further" process it?

I looked into ScalaPB, but as I run my code in Jupyter couldn't get the ".proto" code to be included inline. I also do not know how to convert a DataFrame to an RDD on a stream. Trying .rdd failed because of a streaming source.

Update 1: I figured out how to generate Scala files from protobuf specifications, using the console tool of ScalaPB. Still I'm not able to import them as of a "type mismatch".

Upvotes: 1

Views: 2697

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74779

tl;dr Write a user-defined function (UDF) to deserialize the binary field (of protobuf with a JSON) to JSON.

Think of the serialized body (in binary format) as a table column. Forget about Structured Streaming for a moment (and streaming Datasets).

Let me then rephrase the question to the following:

How to convert (aka cast) a value in binary to [here-your-format]?

Some formats are directly cast-able which makes converting binaries to strings as easy as follows:

$"body" cast "string"

If the string is then a JSON or unixtime you could use built-in "converters", i.e. functions like from_json or from_unixtime.

The introduction should give you a hint how to do conversions like yours.

The data inside body is of type Protocol Buffers v2 and a nested JSON.

To deal with such fields (protobuf + json) you'd have to write a Scala function to decode the "payload" to JSON and create a user-defined function (UDF) using udf:

udf(f: UDF1[_, _], returnType: DataType): UserDefinedFunction Defines a Java UDF1 instance as user-defined function (UDF). The caller must specify the output data type, and there is no automatic input type coercion. By default the returned UDF is deterministic. To change it to nondeterministic, call the API UserDefinedFunction.asNondeterministic().

Then use functions like from_json or get_json_object.

To make your case simpler, write a single-argument function that does the conversion and wrap it into a UDF using udf function.


Trying .rdd failed because of a streaming source.

Use Dataset.foreach or foreachPartition.

foreach(f: (T) ⇒ Unit): Unit Applies a function f to all rows.

foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit Applies a function f to each partition of this Dataset.

Upvotes: 2

Related Questions