Reputation: 49
I am new to kafka-spark streaming and trying to implement the examples from spark documentation with a Protocol buffer serializer/deserializer. So far I followed the official tutorials on
https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html https://developers.google.com/protocol-buffers/docs/javatutorial
and now I stuck on with the following problem. This question might be similar with this post How to deserialize records from Kafka using Structured Streaming in Java?
I already implemented successful the serializer which writes the messages on the kafka topic. Now the task is to consume it with spark structured streaming with a custom deserializer.
public class CustomDeserializer implements Deserializer<Person> {
@Override
public Person deserialize(String topic, byte[] data) {
Person person = null;
try {
person = Person.parseFrom(data);
return person;
} catch (Exception e) {
//ToDo
}
return null;
}
Dataset<Row> dataset = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topic)
.option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.option("value.deserializer", "de.myproject.CustomDeserializer")
.load()
.select("value");
dataset.writeStream()
.format("console")
.start()
.awaitTermination();
But as output I still get the binaries.
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+
| value|
+--------------------+
|[08 AC BD BB 09 1...|
+--------------------+
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+
| value|
+--------------------+
|[08 82 EF D8 08 1...|
+--------------------+
Regarding the tutorial I just need to put the option for the value.deserializer to have a human readable format
.option("value.deserializer", "de.myproject.CustomDeserializer")
Did I miss something?
Upvotes: 1
Views: 2559
Reputation: 61
You need to convert byte to String datatype. dataset.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Then you can use functions.from_json(dataset.col("value"), StructType) to get back the actual DF.
Happy Coding :)
Upvotes: 0
Reputation: 191963
Did you miss this section of the documentation?
Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:
- key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.
- value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.
You'll have to register a UDF that invokes your deserializers instead
Similar to Read protobuf kafka message using spark structured streaming
Upvotes: 1