teddy
teddy

Reputation: 423

Apache Kafka with Structured Streaming protobuf

I am trying to write a Kafka consumer (of a protobuf) using structured streaming. Let's call the protobuf being A which should be deserialized as byte array (Array[Byte]) in Scala.

I tried all the methods I can find online but still could not figure out how to correctly parse message A

Method 1: From the integration guide (https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html) I should cast value as String. But even if I do getBytes to convert string to byte in order to parse my message A, I get:

Exception in thread "main" java.lang.ExceptionInInitializerError
...
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.8

Method 2: I want to convert value directly to byte array. I would get:

missing ')' at '['(line 1, pos 17)

== SQL ==
CAST(key AS Array[Byte])

Method 3: I followed (https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html) to write my own deserializer of protobuf. But got error message:

Schema for type A is not supported

The above three methods are probably all the methods I can find online. It should be a simple and common question, so if anyone have insight into it, please let me know.

Thanks!

Upvotes: 0

Views: 1205

Answers (1)

The schema of the DataFrame created from the streaming source is:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

So the key and value are actually in Array[Byte]. You will have to perform deserialization in Dataframe operations.

For e.g I have this for Kafka deserialization:

  import sparkSession.implicits._

  sparkSession.readStream
    .format("kafka")
    .option("subscribe", topic)
    .option(
      "kafka.bootstrap.servers",
      bootstrapServers
    )
    .load()
    .selectExpr("key", "value") // Selecting only key & value
    .as[(Array[Byte], Array[Byte])]
    .flatMap {
      case (key, value) =>
        for {
          deserializedKey <- Try {
            keyDeserializer.deserialize(topic, key)
          }.toOption
          deserializedValue <- Try {
            valueDeserializer.deserialize(topic, value)
          }.toOption
        } yield (deserializedKey, deserializedValue)
    }

You will need to modify that to deserialize your protobuf records.

Upvotes: 1

Related Questions