Reputation: 2238
I am basically reading from a Kafka source, and dumping each message through to my foreach
processor (Thanks Jacek's page for the simple example).
If this actually works, i shall actually perform some business logic in the process
method here, however, this doesn't work. I believe that the println
doesn't work since its running on executors and there is no way for getting those logs back to driver. However, this insert into
a temp table should at least work and show me that the messages are actually consumed and processed through to the sink.
What am I missing here ?
Really looking for a second set of eyes to check my effort here:
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker"))
.option("subscribe", src_topic)
.load()
val rec = stream.selectExpr("CAST(value AS STRING) as txnJson").as[(String)]
val df = stream.selectExpr("cast (value as string) as json")
val writer = new ForeachWriter[Row] {
val scon = new SConConnection
override def open(partitionId: Long, version: Long) = {
true
}
override def process(value: Row) = {
println("++++++++++++++++++++++++++++++++++++" + value.get(0))
scon.executeUpdate("insert into rs_kafka10(miscCol) values("+value.get(0)+")")
}
override def close(errorOrNull: Throwable) = {
scon.closeConnection
}
}
val yy = df.writeStream
.queryName("ForEachQuery")
.foreach(writer)
.outputMode("append")
.start()
yy.awaitTermination()
Upvotes: 7
Views: 7230
Reputation: 2238
Thanks for comments from Harald and others, I found out a couple of things, which led me to achieve normal processing behaviour -
hope it helps others.
Upvotes: 3