Reputation: 33
I have a spark data frame with the below schema and trying to stream this dataframe to Kafka using Avro
```root
|-- clientTag: struct (nullable = true)
| |-- key: string (nullable = true)
|-- contactPoint: struct (nullable = true)
| |-- email: string (nullable = true)
| |-- type: string (nullable = true)
|-- performCheck: string (nullable = true)```
Sample Record: {"performCheck" : "N", "clientTag" :{"key":"value"}, "contactPoint": {"email":"[email protected]", "type":"EML"}}
Avro Schema:
{
"name":"Message",
"namespace":"kafka.sample.avro",
"type":"record",
"fields":[
{"type":"string", "name":"id"},
{"type":"string", "name":"email"}
{"type":"string", "name":"type"}
]
}
I have couple of questions.
org.apache.spark.sql.Row
to Avro Message because i want to extract email
and type
from the dataframe for each Row and use those values to construct an Avro message?Thanks for the help
Upvotes: 2
Views: 2664
Reputation: 1323
You can try this.
Q#1: You can extract child elements of dataframe using dot notation as:
val dfJSON = spark.read.json("/json/path/sample_avro_data_as_json.json") //can read from schema registry
.withColumn("id", $"clientTag.key")
.withColumn("email", $"contactPoint.email")
.withColumn("type", $"contactPoint.type")
Then u can directly use these columns while assigning values to Avro record that you serialize & send to Kafka.
Q#2: You can keep track of success & failure something like this. This is not fully working code, but can give you an idea.
dfJSON.foreachPartition( currentPartition => {
var producer = new KafkaProducer[String, Array[Byte]](props)
var schema: Schema = ...//Get schema from schema registry or avsc file
val schemaRegProps = Map("schema.registry.url" -> schemaRegistryUrl)
val client = new CachedSchemaRegistryClient(schemaRegistryUrl, Int.MaxValue)
valueSerializer = new KafkaAvroSerializer(client)
valueSerializer.configure(schemaRegProps, false)
val failedRecDF = currentPartition.map(rec =>{
try {
var avroRecord: GenericRecord = new GenericData.Record(schema)
avroRecord.put("id", rec.getAs[String]("id"))
avroRecord.put("email", rec.getAs[String]("email"))
avroRecord.put("type", rec.getAs[String]("type"))
// Serialize record in Producer record & send to Kafka
producer.send(new ProducerRecord[String, Array[Byte]](kafkaTopic, rec.getAs[String]("id").toString(), valueSerializer.serialize(kafkaTopic, avroRecord).toArray))
(rec.getAs[String]("id"), rec.getAs[String]("email"), rec.getAs[String]("type"), "Success")
}catch{
case e: Exception => println("*** Exception *** ")
e.printStackTrace()
(rec.getAs[String]("id"), rec.getAs[String]("email"), rec.getAs[String]("type"), "Failed")
}
})//.toDF("id", "email", "type", "sent_status")
failedRecDF.foreach(println)
//You can retry or log them
})
Response would be:
(111,[email protected],EML,Success)
You can do whatever you want to do with it.
Upvotes: 1