Reputation: 423
I am trying to write a Kafka consumer (of a protobuf) using structured streaming. Let's call the protobuf being A which should be deserialized as byte array (Array[Byte]) in Scala.
I tried all the methods I can find online but still could not figure out how to correctly parse message A
Method 1: From the integration guide (https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html) I should cast value as String. But even if I do getBytes to convert string to byte in order to parse my message A, I get:
Exception in thread "main" java.lang.ExceptionInInitializerError
...
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.8
Method 2: I want to convert value directly to byte array. I would get:
missing ')' at '['(line 1, pos 17)
== SQL ==
CAST(key AS Array[Byte])
Method 3: I followed (https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html) to write my own deserializer of protobuf. But got error message:
Schema for type A is not supported
The above three methods are probably all the methods I can find online. It should be a simple and common question, so if anyone have insight into it, please let me know.
Thanks!
Upvotes: 0
Views: 1205
Reputation: 46
The schema of the DataFrame
created from the streaming source is:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
So the key and value are actually in Array[Byte]
. You will have to perform deserialization in Dataframe operations.
For e.g I have this for Kafka deserialization:
import sparkSession.implicits._
sparkSession.readStream
.format("kafka")
.option("subscribe", topic)
.option(
"kafka.bootstrap.servers",
bootstrapServers
)
.load()
.selectExpr("key", "value") // Selecting only key & value
.as[(Array[Byte], Array[Byte])]
.flatMap {
case (key, value) =>
for {
deserializedKey <- Try {
keyDeserializer.deserialize(topic, key)
}.toOption
deserializedValue <- Try {
valueDeserializer.deserialize(topic, value)
}.toOption
} yield (deserializedKey, deserializedValue)
}
You will need to modify that to deserialize your protobuf records.
Upvotes: 1