B. Smith
B. Smith

Reputation: 1193

How can unpersisting an RDD cause an RPC timeout?

I have a very large RDD that I am caching (it still fits into memory), but since it is so big, I want to unpersist it as soon as possible. However when I call unpersist on it, it is causing an RPC timeout error:

17/11/21 23:25:55 INFO BlockManager: Removing RDD 171
Exception in thread "main" org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
        at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:135)
        at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1793)
        at org.apache.spark.rdd.RDD.unpersist(RDD.scala:216)

17/11/21 23:27:55 WARN BlockManagerMaster: Failed to remove RDD 171 - Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout

The code that is triggering this error looks like the following:

val tranformation1 = firstTransformation(inputData).cache
log("Tranformation1 Count: " + transformation1.count)
val transformation2 = secondTransformation(transformation1).cache
transformation1.unpersist()

Unpersisting an RDD should be a relatively inexpensive action. How can unpersisting an RDD cause an RPC timeout?

Upvotes: 5

Views: 1904

Answers (2)

Ged
Ged

Reputation: 18108

Slightly more comprehensive answer as it is most likely a version dependent issue you are encountering - things have changed:

From the JIRA:

The RDD and DataFrame .unpersist() method, and Broadcast .destroy() method, take an optional 'blocking' argument. The default was 'false' in all cases except for (Scala) RDDs and their GraphX subclasses.

The default is now 'false' (non-blocking) in all of these methods.

Pyspark's RDD and Broadcast classes now have an optional 'blocking' argument as well, with the same behavior.

Upvotes: 1

Sarvesh Maurya
Sarvesh Maurya

Reputation: 1

unpersist() by default is blocking call, so it will wait for deletion of all the blocks of RDD. So once the instruction to unpersist the RDD is executed from driver it would ask all the executors to unpersist. Driver will wait to get the response from executors and executor may get timeout due to multiple reason like network issue.

To avoid the exception you can make unpersist() nonblocking and you would not receive this exception.

val tranformation1 = firstTransformation(inputData).cache
log("Tranformation1 Count: " + transformation1.count)
val transformation2 = secondTransformation(transformation1).cache
transformation1.unpersist(false)

On making the unpersist nonblocking, the RDD is just marked as deleted and it may get cleared during GC or background thread of Spark unpersist it.

Upvotes: 0

Related Questions