Vinil
Vinil

Reputation: 33

Convert a spark dataframe Row to Avro and publish to kakfa

I have a spark data frame with the below schema and trying to stream this dataframe to Kafka using Avro

```root
 |-- clientTag: struct (nullable = true)
 |    |-- key: string (nullable = true)
 |-- contactPoint: struct (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- type: string (nullable = true)
 |-- performCheck: string (nullable = true)```

Sample Record: {"performCheck" : "N", "clientTag" :{"key":"value"}, "contactPoint": {"email":"[email protected]", "type":"EML"}}

Avro Schema:

{ "name":"Message", "namespace":"kafka.sample.avro", "type":"record", "fields":[ {"type":"string", "name":"id"}, {"type":"string", "name":"email"} {"type":"string", "name":"type"} ] }

I have couple of questions.

  1. What is the best way to convert a org.apache.spark.sql.Row to Avro Message because i want to extract email and type from the dataframe for each Row and use those values to construct an Avro message?
  2. Eventually, all the Avro messages will be send to Kafka. So, if there is an error while producing, how can i collect all the Row's that failed to be produced to Kafka and return a dataframe?

Thanks for the help

Upvotes: 2

Views: 2664

Answers (1)

KiranM
KiranM

Reputation: 1323

You can try this.

Q#1: You can extract child elements of dataframe using dot notation as:

  val dfJSON = spark.read.json("/json/path/sample_avro_data_as_json.json") //can read from schema registry
    .withColumn("id", $"clientTag.key")
    .withColumn("email", $"contactPoint.email")
    .withColumn("type", $"contactPoint.type")

Then u can directly use these columns while assigning values to Avro record that you serialize & send to Kafka.

Q#2: You can keep track of success & failure something like this. This is not fully working code, but can give you an idea.

  dfJSON.foreachPartition( currentPartition => {

    var producer = new KafkaProducer[String, Array[Byte]](props)
    var schema: Schema = ...//Get schema from schema registry or avsc file
    val schemaRegProps = Map("schema.registry.url" -> schemaRegistryUrl)
    val client = new CachedSchemaRegistryClient(schemaRegistryUrl, Int.MaxValue)
    valueSerializer = new KafkaAvroSerializer(client)
    valueSerializer.configure(schemaRegProps, false)

    val failedRecDF = currentPartition.map(rec =>{
      try {

        var avroRecord: GenericRecord = new GenericData.Record(schema)
        avroRecord.put("id", rec.getAs[String]("id"))
        avroRecord.put("email", rec.getAs[String]("email"))
        avroRecord.put("type", rec.getAs[String]("type"))

        // Serialize record in Producer record & send to Kafka

        producer.send(new ProducerRecord[String, Array[Byte]](kafkaTopic, rec.getAs[String]("id").toString(), valueSerializer.serialize(kafkaTopic, avroRecord).toArray))
        (rec.getAs[String]("id"), rec.getAs[String]("email"), rec.getAs[String]("type"), "Success")
      }catch{
        case e: Exception => println("*** Exception *** ")
          e.printStackTrace()

          (rec.getAs[String]("id"), rec.getAs[String]("email"), rec.getAs[String]("type"), "Failed")
      }

    })//.toDF("id", "email", "type", "sent_status")

    failedRecDF.foreach(println)
    //You can retry or log them
  })

Response would be:

(111,[email protected],EML,Success)

You can do whatever you want to do with it.

Upvotes: 1

Related Questions