wdmv1981
wdmv1981

Reputation: 49

How to consume correctly from Kafka topic with Java Spark structured streaming

I am new to kafka-spark streaming and trying to implement the examples from spark documentation with a Protocol buffer serializer/deserializer. So far I followed the official tutorials on

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html https://developers.google.com/protocol-buffers/docs/javatutorial

and now I stuck on with the following problem. This question might be similar with this post How to deserialize records from Kafka using Structured Streaming in Java?

I already implemented successful the serializer which writes the messages on the kafka topic. Now the task is to consume it with spark structured streaming with a custom deserializer.

public class CustomDeserializer implements Deserializer<Person> {

@Override
public Person deserialize(String topic, byte[] data) {
    Person person = null;
    try {
        person = Person.parseFrom(data);

        return person;
    } catch (Exception e) {
               //ToDo
    }

    return null;
 }


Dataset<Row> dataset = sparkSession.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", topic)
        .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        .option("value.deserializer", "de.myproject.CustomDeserializer")
        .load()
        .select("value");

    dataset.writeStream()
        .format("console")
        .start()
        .awaitTermination();

But as output I still get the binaries.

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|[08 AC BD BB 09 1...|
+--------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|[08 82 EF D8 08 1...|
+--------------------+

Regarding the tutorial I just need to put the option for the value.deserializer to have a human readable format

.option("value.deserializer", "de.myproject.CustomDeserializer")

Did I miss something?

Upvotes: 1

Views: 2559

Answers (2)

Bala Srinivasan
Bala Srinivasan

Reputation: 61

You need to convert byte to String datatype. dataset.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Then you can use functions.from_json(dataset.col("value"), StructType) to get back the actual DF.

Happy Coding :)

Upvotes: 0

OneCricketeer
OneCricketeer

Reputation: 191963

Did you miss this section of the documentation?

Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:

  • key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.
  • value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.

You'll have to register a UDF that invokes your deserializers instead

Similar to Read protobuf kafka message using spark structured streaming

Upvotes: 1

Related Questions