Raghav
Raghav

Reputation: 2238

Structured Streaming - Foreach Sink

I am basically reading from a Kafka source, and dumping each message through to my foreach processor (Thanks Jacek's page for the simple example).

If this actually works, i shall actually perform some business logic in the process method here, however, this doesn't work. I believe that the println doesn't work since its running on executors and there is no way for getting those logs back to driver. However, this insert into a temp table should at least work and show me that the messages are actually consumed and processed through to the sink.

What am I missing here ?

Really looking for a second set of eyes to check my effort here:

 val stream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker")) 
      .option("subscribe", src_topic) 
      .load()

    val rec = stream.selectExpr("CAST(value AS STRING) as txnJson").as[(String)]

    val df = stream.selectExpr("cast (value as string) as json")

    val writer = new ForeachWriter[Row] {
      val scon = new SConConnection
      override def open(partitionId: Long, version: Long) = {
        true
      }
      override def process(value: Row) = {
        println("++++++++++++++++++++++++++++++++++++" + value.get(0))
        scon.executeUpdate("insert into rs_kafka10(miscCol) values("+value.get(0)+")")
      }
      override def close(errorOrNull: Throwable) = {
        scon.closeConnection
      }
    }


    val yy = df.writeStream
      .queryName("ForEachQuery")
      .foreach(writer)
      .outputMode("append")
      .start()

    yy.awaitTermination()

Upvotes: 7

Views: 7230

Answers (1)

Raghav
Raghav

Reputation: 2238

Thanks for comments from Harald and others, I found out a couple of things, which led me to achieve normal processing behaviour -

  1. test code with local mode, yarn isnt the biggest help in debugging
  2. for some reason, the process method of foreach sink doesnt allow calling other methods. When i put my business logic directly in there, it works.

hope it helps others.

Upvotes: 3

Related Questions