codewarrior
codewarrior

Reputation: 1034

How to publish to Kafka from Spark using Structured streaming?

I'm writing a Spark application that reads messages from a Kafka topic, looks up records in a DB, constructs new messages and publishes them to another Kafka topic. Here's what my code looks like -

val inputMessagesDataSet: DataSet[InputMessage] = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server1")
  .option("subscribe", "input-kafka-topic1")
  .load()
  .select($"value")
  .mapPartitions{r =>
     val messages: Iterator[InputMessage] = parseMessages(r)
  }

inputMessagesDataSet
  .writeStream
  .foreachBatch(processMessages _)
  .trigger(trigger)
  .start
  .awaitTermination

def processMessages(inputMessageDataSet: Dataset[InputMessage]) = {
   // fetch stuff from DB and build a DataSet[OutputMessage]
   val outputMessagesDataSet: DataSet[OutputMessage] = ...
   // now queue to another kafka topic
  outputMessagesDataSet
      .writeStream
      .trigger(trigger)
      .format("kafka")
      .option("kafka.bootstrap.servers", "server1")
      .option("topic", "output-kafka-topic")
      .option("checkpointLocation", loc)
      .start
      .awaitTermination
}

But I get an error saying

org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame; on line outputMessagesDataSet.writeStream

This seems to be because outputMessagesDataSet is not created using readStream. The reason I'm not constructing the DataSet[OutputMessage] in the original mapPartitions() is because the classes that are needed for fetching DB records, etc. are not serializable, so it throws NotSerializableException.

How do I construct a new DataSet and queue to Kafka?

Upvotes: 0

Views: 420

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 192023

foreachBatch accepts a static DataSet, so you need to use write, not writeStream

Alternatively, you can writeStream.format("kafka") without using the forEachBatch

Upvotes: 2

Related Questions