Reputation: 509
I am trying the persist feature in Spark to persist the data in memory and do computations on it.
In my case I want to persist only 2GB DStream. this is my code :
val conf = new SparkConf()
.setAppName("File Count")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val file = ssc.textFileStream("hdfs://192.168.1.31:8020/user/sparkStreaming/input")
var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
val windowed = test.reduceByKey(((a:Int,b:Int) => (a + b)))
windowed.persist(MEMORY_ONLY_SER)
When I reach 2GB, I do another treatment, and I use unpersist to free memory.
Could someone help me with how to know how much I persisted?
and if I know how much I persisted, how can I use it as condution ((if PERSISTED == 2 GB) do treatment)?
Upvotes: 1
Views: 1657
Reputation: 37435
In Spark Streaming, persist
or cache
will not continuously add data to memory for each batch of data. What it does is indicate that the underlying RDD should be cached so that further operations on it are applied to that memorized computation of the RDD. So, it's not a cumulative process.
If you want to accumulate data, you could do something like this:
var addedRDD = sparkContext.emptyRDD
...
dstream.foreachRDD{ rdd =>
addedRDD = addedRDD union rdd
addedRDD.cache()
}
This will eventually get you in trouble as this RDD will grow on size and complexity (I encourage you to inspect it in the Spark UI after few iterations)
To get the memory used, you should use the metrics interface. I would guess that you are looking for BlockManager.memory.memUsed_MB
but I could be wrong.
That said, relying on JVM memory metrics to trigger some work seems like a bad idea to me, as in-mem size depends on internal structures used to hold the data and will not accurately reflect the actual size of the data.
I would rather figure out metrics based on record count x record size
.
Once we have a trigger metric, we could 'purge' the collected data using that condition:
var addedRDD = sparkContext.emptyRDD
...
dstream.foreachRDD{ rdd =>
addedRDD = addedRDD union rdd
addedRDD.cache()
if (addedRDD.count() > SomeSize) {
writeToStorage(addedRDD)
addedRDD.unpersist(true)
addedRDD=sparkContext.emptyRDD()
}
}
(*) code provided as guidance only.
Upvotes: 1