Reputation: 9868
I came across Structured Streaming with Spark, it has an example of continuously consuming from an S3 bucket and writing processed results to a MySQL DB.
// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")
// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
.writeStream.format("jdbc")
.start("jdbc:mysql//...")
How can this be used with Spark Kafka Streaming?
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
Is there a way to combine these two examples without using stream.foreachRDD(rdd => {})
?
Upvotes: 11
Views: 4625
Reputation: 149608
Is there a way to combine these two examples without using
stream.foreachRDD(rdd => {})
?
Not yet. Spark 2.0.0 doesn't have Kafka sink support for Structured Streaming. This is a feature that should come out in Spark 2.1.0 according to Tathagata Das, one of the creators of Spark Streaming. Here is the relevant JIRA issue.
Yes, It's possible with Spark version 2.2 onwards.
stream
.writeStream // use `write` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("topic", "target-topic1")
.start()
Check this SO post(read and write on Kafka topic with Spark streaming) for more.
Kafka 0.10 integration for Structured Streaming is now expiramentaly supported in Spark 2.0.2:
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Upvotes: 12
Reputation: 657
I was having a similar issue w.r.t reading from Kafka source and writing to a Cassandra sink. Created a simple project here kafka2spark2cassandra, sharing in case it could be helpful for anyone.
Upvotes: 3