Reputation: 797
Using Spark streaming (1.6) I have a filestream for reading lookup data with 2s of batch size, however files are copyied to the directory only every hour.
Once there's a new file, its content is read by the stream, this is what I want to cache into memory and keep there
until new files are read.
There's another stream to which I want to join this dataset therefore I'd like to cache.
This is a follow-up question of Batch lookup data for Spark streaming.
The answer does work fine with updateStateByKey
however I don't know how to deal with cases when a KV pair is
deleted from the lookup files, as the Sequence of values in updateStateByKey
keeps growing.
Also any hint how to do this with mapWithState
would be great.
This is what I tried so far, but the data doesn't seem to be persisted:
val dictionaryStream = ssc.textFileStream("/my/dir")
dictionaryStream.foreachRDD{x =>
if (!x.partitions.isEmpty) {
x.unpersist(true)
x.persist()
}
}
Upvotes: 2
Views: 1781
Reputation: 330093
DStreams
can be persisted directly using persist
method which persist every RDD in the stream:
dictionaryStream.persist
According to the official documentation this applied automatically for
window-based operations like
reduceByWindow
andreduceByKeyAndWindow
and state-based operations likeupdateStateByKey
so there should be no need for explicit caching in your case. Also there is no need for manual unpersisting. To quote the docs once again:
by default, all input data and persisted RDDs generated by DStream transformations are automatically cleared
and a retention period is tuned automatically based on the transformations which are used in the pipeline.
Regarding mapWithState
you'll have to provide a StateSpec
. A minimal example requires a functions which takes key
, Option
of current value
and previous state. Lets say you have DStream[(String, Long)]
and you want to record maximum value so far:
val state = StateSpec.function(
(key: String, current: Option[Double], state: State[Double]) => {
val max = Math.max(
current.getOrElse(Double.MinValue),
state.getOption.getOrElse(Double.MinValue)
)
state.update(max)
(key, max)
}
)
val inputStream: DStream[(String, Double)] = ???
inputStream.mapWithState(state).print()
It is also possible to provide initial state, timeout interval and capture current batch time. The last two can be used to implement removal strategy for the keys which haven't been update for some period of time.
Upvotes: 3