Reputation: 95
Spark structured streaming, trying mapgroupwithstate. Did anyone face the situation where .format("console") works perfectly and prints incremental state changes perfectly but whenever I try to change the .format("anyStreamingSinkClass") the dataframe received at the sink class only has the current batch but no memory of the state or incremental effect.
case class WordCount(word:String,count:Int)
case class WordInfo(totalSum:Int)
case class WordUpdate(word:String,count:Int,expired:Boolean)
val ds = df.as[String].map{ x=>
val arr = x.split(",",-1)
WordCount( arr(0), arr(1).toInt )
}.groupByKey(_.word)
.mapGroupsWithState[WordInfo,WordUpdate](GroupStateTimeout.NoTimeout()) {
case( word:String, allWords:Iterator[WordCount], state:GroupState[WordInfo]) =>
val events = allWords.toSeq
val updatedSession = if (state.exists) {
val existingState = state.get
val updatedEvents = WordInfo(existingState.totalSum + events.map(event ⇒ event.count).sum)
updatedEvents
}
else {
WordInfo(events.map(event => event.count).sum)
}
state.update(updatedSession)
WordUpdate(word,updatedSession.totalSum,false)
}
val query = ds
.writeStream
//.format("console")
.format("com.subhankar.streamDB.ConsoleSinkProvider")
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime(3.seconds))
//.option("truncate",false)
.option("checkpointLocation","out.b")
.queryName("q2090" )
.start()
query.awaitTermination()
for sink format I get Batch 21's distinct count is 1 x,1 Batch 22's distinct count is 1 x,2 Batch 23's distinct count is 1 x,3
For console format I get
-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+-------+
|word|count|expired|
+----+-----+-------+
| x| 1| false|
+----+-----+-------+
-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----+-------+
|word|count|expired|
+----+-----+-------+
| x| 3| false|
+----+-----+-------+
-------------------------------------------
Batch: 3
-------------------------------------------
+----+-----+-------+
|word|count|expired|
+----+-----+-------+
| x| 6| false|
+----+-----+-------+
the sink does a simple print...
override def addBatch(batchId: Long, data: DataFrame) = {
val batchDistinctCount = data.rdd.distinct.count()
if(data.count()>0) {
println(s"Batch ${batchId}'s distinct count is ${batchDistinctCount}")
println(data.map(x=> x.getString(0) + "," + x.getInt(1)).collect().mkString(","))
}
}
Upvotes: 1
Views: 592
Reputation: 1220
I had the same problem as yours.
When I tested it on Spark 2.2.0, the state got reset and lost between every mini-batches.
Then I tested it on Spark 2.3.0, the result became a thrown exception:
Queries with streaming sources must be executed with writeStream.start()
Through this exception I figured out that my customer Sink had unsupported operations.
In your case, your unsupported operations are multiple aggregations.
You have data.rdd.distinct.count()
and data.count()
and data.map
in one mini-batch, which is so-called multiple aggregations, and considered unsupported.
Although on Spark < 2.3 your code can run with wrong results, on Spark >= 2.3 it simply gets the exception.
To fix it, the following modification which avoid multiple aggregations can get correct results.
override def addBatch(batchId: Long, dataframe: DataFrame) = {
val data = dataframe.collect() // now do everything in this Array (care for OUT OF MEMORY)
val batchDistinctCount = Set(data).size()
if(data.length > 0) {
println(s"Batch ${batchId}'s distinct count is ${batchDistinctCount}")
println(data.map(x=> x.getString(0) + "," + x.getInt(1)).mkString(","))
}
}
Upvotes: 1