Reputation: 188
I am new to spark. I use structured streaming to read data from kafka.
I can read the data using this code in Scala:
val data = spark.readStream
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.option("startingOffsets", startingOffsets)
My data in the value column are Thrift records. Streaming api gives the data in binary format. I see examples of casting the data to string or json but I am not able to find any examples of how to deserialize the data to Thrift.
How can I achieve this?
Upvotes: 2
Views: 1378
Reputation: 73
Well, here is the followup solution. I can't post my own code, but here is the public code you can use, credit given to the owner/coder.
First of all, you need to convert the array[byte]/value to Row by calling convertObject
function, let's call it makeRow
Second of all, you need to get your thrift class structType/schema by calling convert
function, let's call the final result schema
Then you need to register an UDF like this val deserializer = udf((bytes: Array[Byte]) => makeRow(bytes), schema)
Note: You can not derictly use makeRow without passing the schema, otherwise Spark will complains: Schema for type org.apache.spark.sql.Row is not supported
Then you can use it in this way:
val stuff = kafkaStuff.withColumn("data", deserializer(kafkaStuff("value")))
val finalStuff ="data.*") are done! Hope this helps.
And give another credit to this post Spark UDF for StructType / Row which gives me the final idea when my previous solution is so close.
Upvotes: 1
Reputation: 880
I found this blog on the databricks web site. It shows how Spark SQL’s APIs can be leveraged to consume and transform complex data streams from Apache Kafka.
There is a section explaining how UDF can be used to Deserializer rows:
object MyDeserializerWrapper {
val deser = new MyDeserializer
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) =>
MyDeserializerWrapper.deser.deserialize(topic, bytes)
df.selectExpr("""deserialize("topic1", value) AS message""")
I am using java and therefore had to to write the following sample UDF, to check how it can be called in java:
UDF1<byte[], String> mode = new UDF1<byte[], String>() {
public String call(byte[] bytes) throws Exception {
String s = new String(bytes);
return "_" + s;
Now i can use this UDF in the structured streaming word count example, as follows:
Dataset<String> words = df
//converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
// .selectExpr("CAST(value AS STRING)")
.select( callUDF("mode", col("value")) )
new FlatMapFunction<String, String>() {
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}, Encoders.STRING());
The next step for me is to write a UDF for the thrift deserialization. I will post it as soon as i am done.
Upvotes: -1