Reputation: 143
I am using a data stream to be written to a kafka topic as well as hbase. For Kafka, I use a format as this:
dataset.selectExpr("id as key", "to_json(struct(*)) as value")
.writeStream.format("kafka")
.option("kafka.bootstrap.servers", Settings.KAFKA_URL)
.option("topic", Settings.KAFKA_TOPIC2)
.option("checkpointLocation", "/usr/local/Cellar/zookeepertmp")
.outputMode(OutputMode.Complete())
.start()
and then for Hbase, I do something like this:
dataset.writeStream.outputMode(OutputMode.Complete())
.foreach(new ForeachWriter[Row] {
override def process(r: Row): Unit = {
//my logic
}
override def close(errorOrNull: Throwable): Unit = {}
override def open(partitionId: Long, version: Long): Boolean = {
true
}
}).start().awaitTermination()
This writes to Hbase as expected but doesn't always write to the kafka topic. I am not sure why that is happening.
Upvotes: 1
Views: 1684
Reputation:
Use foreachBatch
in spark:
If you want to write the output of a streaming query to multiple locations, then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(…).save(…) // location 1
batchDF.write.format(…).save(…) // location 2
batchDF.unpersist()
}
Upvotes: 1