Reputation: 1034
I'm writing a Spark application that reads messages from a Kafka topic, looks up records in a DB, constructs new messages and publishes them to another Kafka topic. Here's what my code looks like -
val inputMessagesDataSet: DataSet[InputMessage] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "input-kafka-topic1")
.load()
.select($"value")
.mapPartitions{r =>
val messages: Iterator[InputMessage] = parseMessages(r)
}
inputMessagesDataSet
.writeStream
.foreachBatch(processMessages _)
.trigger(trigger)
.start
.awaitTermination
def processMessages(inputMessageDataSet: Dataset[InputMessage]) = {
// fetch stuff from DB and build a DataSet[OutputMessage]
val outputMessagesDataSet: DataSet[OutputMessage] = ...
// now queue to another kafka topic
outputMessagesDataSet
.writeStream
.trigger(trigger)
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("topic", "output-kafka-topic")
.option("checkpointLocation", loc)
.start
.awaitTermination
}
But I get an error saying
org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame;
on line
outputMessagesDataSet.writeStream
This seems to be because outputMessagesDataSet
is not created using readStream
.
The reason I'm not constructing the DataSet[OutputMessage]
in the original mapPartitions()
is because the classes that are needed for fetching DB records, etc. are not serializable, so it throws NotSerializableException
.
How do I construct a new DataSet and queue to Kafka?
Upvotes: 0
Views: 420
Reputation: 192023
foreachBatch
accepts a static DataSet, so you need to use write
, not writeStream
Alternatively, you can writeStream.format("kafka")
without using the forEachBatch
Upvotes: 2