Drissi Yazami
Drissi Yazami

Reputation: 37

How to write DataFrame (built from RDD inside foreach) to Kafka?

I'm trying to write a DataFrame from Spark to Kafka and I couldn't find any solution out there. Can you please show me how to do that?

Here is my current code:

activityStream.foreachRDD { rdd =>
  val activityDF = rdd
    .toDF()
    .selectExpr(
        "timestamp_hour", "referrer", "action", 
        "prevPage", "page", "visitor", "product", "inputProps.topic as topic")
    val producerRecord = new ProducerRecord(topicc, activityDF)

    kafkaProducer.send(producerRecord) // <--- this shows an error 
  }

type mismatch; found : org.apache.kafka.clients.producer.ProducerRecord[Nothing,org‌​.apache.spark.sql.Da‌​taFrame] (which expands to) org.apache.kafka.clients.producer.ProducerRecord[Nothing,org‌​.apache.spark.sql.Da‌​taset[org.apache.spa‌​rk.sql.Row]] required: org.apache.kafka.clients.producer.ProducerRecord[Nothing,Str‌​ing] Error occurred in an application involving default arguments.

Upvotes: 0

Views: 2274

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74669

Do collect on the activityDF to get the records (not Dataset[Row]) and save them to Kafka.

Note that you'll end up with a collection of records after collect so you probably have to iterate over it, e.g.

val activities = activityDF.collect()
// the following is pure Scala and has nothing to do with Spark
activities.foreach { a: Row =>
  val pr: ProducerRecord = // map a to pr
  kafkaProducer.send(pr)
}

Use pattern matching on Row to destructure it to fields/columns, e.g.

activities.foreach { case Row(timestamp_hour, referrer, action, prevPage, page, visitor, product, topic) =>
  // ...transform a to ProducerRecord
  kafkaProducer.send(pr)
}

PROTIP: I'd strongly suggest using a case class and transform DataFrame (= Dataset[Row]) to Dataset[YourCaseClass].

See Spark SQL's Row and Kafka's ProducerRecord docs.


As Joe Nate pointed out in the comments:

If you do "collect" before writing to any endpoint, it's going to make all the data aggregate at the driver and then make the driver write it out. 1) Can crash the driver if too much data (2) no parallelism in write.

That's 100% correct. I wished I had said it :)

You may want to use the approach as described in Writing Stream Output to Kafka instead.

Upvotes: 1

Related Questions