Reputation: 7819
Considering a Apache Spark 2.2.0 Structured Stream as:
jsonStream.printSchema()
root
|-- body: binary (nullable = true)
The data inside body is of type Protocol Buffers v2 and a nested JSON. It looks like
syntax = "proto2";
message Data {
required string data = 1;
}
message List {
repeated Data entry = 1;
}
How can I extract the data inside Spark to "further" process it?
I looked into ScalaPB, but as I run my code in Jupyter couldn't get the ".proto" code to be included inline. I also do not know how to convert a DataFrame to an RDD on a stream. Trying .rdd
failed because of a streaming source.
Update 1: I figured out how to generate Scala files from protobuf specifications, using the console tool of ScalaPB. Still I'm not able to import them as of a "type mismatch".
Upvotes: 1
Views: 2697
Reputation: 74779
tl;dr Write a user-defined function (UDF) to deserialize the binary field (of protobuf with a JSON) to JSON.
Think of the serialized body
(in binary
format) as a table column. Forget about Structured Streaming for a moment (and streaming Datasets).
Let me then rephrase the question to the following:
How to convert (aka cast) a value in binary to [here-your-format]?
Some formats are directly cast
-able which makes converting binaries to strings as easy as follows:
$"body" cast "string"
If the string is then a JSON or unixtime you could use built-in "converters", i.e. functions like from_json
or from_unixtime
.
The introduction should give you a hint how to do conversions like yours.
The data inside body is of type Protocol Buffers v2 and a nested JSON.
To deal with such fields (protobuf + json) you'd have to write a Scala function to decode the "payload" to JSON and create a user-defined function (UDF) using udf:
udf(f: UDF1[_, _], returnType: DataType): UserDefinedFunction Defines a Java UDF1 instance as user-defined function (UDF). The caller must specify the output data type, and there is no automatic input type coercion. By default the returned UDF is deterministic. To change it to nondeterministic, call the API
UserDefinedFunction.asNondeterministic()
.
Then use functions like from_json
or get_json_object
.
To make your case simpler, write a single-argument function that does the conversion and wrap it into a UDF using udf
function.
Trying .rdd failed because of a streaming source.
Use Dataset.foreach or foreachPartition.
foreach(f: (T) ⇒ Unit): Unit Applies a function f to all rows.
foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit Applies a function f to each partition of this Dataset.
Upvotes: 2