subhankar
subhankar

Reputation: 95

Structured Streaming mapGroupWithState not working for custom sink

Spark structured streaming, trying mapgroupwithstate. Did anyone face the situation where .format("console") works perfectly and prints incremental state changes perfectly but whenever I try to change the .format("anyStreamingSinkClass") the dataframe received at the sink class only has the current batch but no memory of the state or incremental effect.

case class WordCount(word:String,count:Int)
case class WordInfo(totalSum:Int)
case class WordUpdate(word:String,count:Int,expired:Boolean)


val ds = df.as[String].map{ x=>
  val arr = x.split(",",-1)
  WordCount( arr(0), arr(1).toInt )
}.groupByKey(_.word)
  .mapGroupsWithState[WordInfo,WordUpdate](GroupStateTimeout.NoTimeout()) {
  case( word:String, allWords:Iterator[WordCount], state:GroupState[WordInfo]) =>
    val events = allWords.toSeq
    val updatedSession = if (state.exists) {
      val existingState = state.get
      val updatedEvents = WordInfo(existingState.totalSum + events.map(event ⇒ event.count).sum)
      updatedEvents
    }
    else {
      WordInfo(events.map(event => event.count).sum)
    }
    state.update(updatedSession)

    WordUpdate(word,updatedSession.totalSum,false)

}


val query = ds  
  .writeStream
    //.format("console")
  .format("com.subhankar.streamDB.ConsoleSinkProvider")
  .outputMode(OutputMode.Update())
  .trigger(Trigger.ProcessingTime(3.seconds))
  //.option("truncate",false)
 .option("checkpointLocation","out.b")
  .queryName("q2090" )
  .start()

query.awaitTermination()

for sink format I get Batch 21's distinct count is 1 x,1 Batch 22's distinct count is 1 x,2 Batch 23's distinct count is 1 x,3

For console format I get

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+-------+
|word|count|expired|
+----+-----+-------+
|   x|    1|  false|
+----+-----+-------+

-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----+-------+
|word|count|expired|
+----+-----+-------+
|   x|    3|  false|
+----+-----+-------+

-------------------------------------------
Batch: 3
-------------------------------------------
+----+-----+-------+
|word|count|expired|
+----+-----+-------+
|   x|    6|  false|
+----+-----+-------+

the sink does a simple print...

override def addBatch(batchId: Long, data: DataFrame) = {

  val batchDistinctCount = data.rdd.distinct.count()
  if(data.count()>0) {
    println(s"Batch ${batchId}'s distinct count is ${batchDistinctCount}")
    println(data.map(x=> x.getString(0) + "," + x.getInt(1)).collect().mkString(","))
  }
}

Upvotes: 1

Views: 592

Answers (1)

John Lin
John Lin

Reputation: 1220

I had the same problem as yours.

When I tested it on Spark 2.2.0, the state got reset and lost between every mini-batches.

Then I tested it on Spark 2.3.0, the result became a thrown exception:

Queries with streaming sources must be executed with writeStream.start()

Through this exception I figured out that my customer Sink had unsupported operations.

In your case, your unsupported operations are multiple aggregations.

You have data.rdd.distinct.count() and data.count() and data.map in one mini-batch, which is so-called multiple aggregations, and considered unsupported.

Although on Spark < 2.3 your code can run with wrong results, on Spark >= 2.3 it simply gets the exception.

To fix it, the following modification which avoid multiple aggregations can get correct results.

override def addBatch(batchId: Long, dataframe: DataFrame) = {
  val data = dataframe.collect()  // now do everything in this Array (care for OUT OF MEMORY)
  val batchDistinctCount = Set(data).size()
  if(data.length > 0) {
    println(s"Batch ${batchId}'s distinct count is ${batchDistinctCount}")
    println(data.map(x=> x.getString(0) + "," + x.getInt(1)).mkString(","))
  }
}

Upvotes: 1

Related Questions