Reputation: 37
I'm trying to write a DataFrame
from Spark to Kafka and I couldn't find any solution out there. Can you please show me how to do that?
Here is my current code:
activityStream.foreachRDD { rdd =>
val activityDF = rdd
.toDF()
.selectExpr(
"timestamp_hour", "referrer", "action",
"prevPage", "page", "visitor", "product", "inputProps.topic as topic")
val producerRecord = new ProducerRecord(topicc, activityDF)
kafkaProducer.send(producerRecord) // <--- this shows an error
}
type mismatch; found : org.apache.kafka.clients.producer.ProducerRecord[Nothing,org.apache.spark.sql.DataFrame] (which expands to) org.apache.kafka.clients.producer.ProducerRecord[Nothing,org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] required: org.apache.kafka.clients.producer.ProducerRecord[Nothing,String] Error occurred in an application involving default arguments.
Upvotes: 0
Views: 2274
Reputation: 74669
Do collect
on the activityDF
to get the records (not Dataset[Row]
) and save them to Kafka.
Note that you'll end up with a collection of records after collect
so you probably have to iterate over it, e.g.
val activities = activityDF.collect()
// the following is pure Scala and has nothing to do with Spark
activities.foreach { a: Row =>
val pr: ProducerRecord = // map a to pr
kafkaProducer.send(pr)
}
Use pattern matching on Row
to destructure it to fields/columns, e.g.
activities.foreach { case Row(timestamp_hour, referrer, action, prevPage, page, visitor, product, topic) =>
// ...transform a to ProducerRecord
kafkaProducer.send(pr)
}
PROTIP: I'd strongly suggest using a case class
and transform DataFrame
(= Dataset[Row]
) to Dataset[YourCaseClass]
.
See Spark SQL's Row and Kafka's ProducerRecord docs.
As Joe Nate pointed out in the comments:
If you do "collect" before writing to any endpoint, it's going to make all the data aggregate at the driver and then make the driver write it out. 1) Can crash the driver if too much data (2) no parallelism in write.
That's 100% correct. I wished I had said it :)
You may want to use the approach as described in Writing Stream Output to Kafka instead.
Upvotes: 1