Christos Hadjinikolis
Christos Hadjinikolis

Reputation: 2158

Why does join fail with "java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]"?

I am using Spark 1.5.

I have two dataframes of the form:

scala> libriFirstTable50Plus3DF
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]

scala> linkPersonItemLessThan500DF
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]

libriFirstTable50Plus3DF has 766,151 records while linkPersonItemLessThan500DF has 26,694,353 records. Note that I am using repartition(number) on linkPersonItemLessThan500DF since I intend to join these two later on. I am following up the above code with:

val userTripletRankDF = linkPersonItemLessThan500DF
     .join(libriFirstTable50Plus3DF, Seq("family_id"))
     .take(20)
     .foreach(println(_))

for which I am getting this output:

16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200)
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:        at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
 at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190)
 at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
 at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
 at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
 at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
 at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
 at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
 at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
 at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
 at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87)
 at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)
 at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91)
 at $iwC$$iwC$$iwC.<init>(<console>:93)
 at $iwC$$iwC.<init>(<console>:95)
 at $iwC.<init>(<console>:97)
 at <init>(<console>:99)
 at .<init>(<console>:103)
 at .<clinit>(<console>)
 at .<init>(<console>:7)
 at .<clinit>(<console>)
 at $print(<console>)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
 at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
 at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
 at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
 at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
 at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

and I don't understand what is the issue. Is it as simple as increasing the waiting time? Is the join too intensive? Do I need more memory? Is the shufffling intensive? Can anyone help?

Upvotes: 79

Views: 145547

Answers (5)

user238607
user238607

Reputation: 2468

I got this error when I had breeze's leastSquares function in a loop. Spark considered this to be long running and threw timeout exceptions. The solution was to move the task to its own distributed loop.

Upvotes: 0

lasclocker
lasclocker

Reputation: 311

In addition to increasing spark.sql.broadcastTimeout or persist() both DataFrames,

You may try:

1.disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1

2.increase the spark driver memory by setting spark.driver.memory to a higher value.

Upvotes: 8

pedromorfeu
pedromorfeu

Reputation: 1939

In my case, it was caused by a broadcast over a large dataframe:

df.join(broadcast(largeDF))

So, based on the previous answers, I fixed it by removing the broadcast:

df.join(largeDF)

Upvotes: 0

T. Gawęda
T. Gawęda

Reputation: 16086

This happens because Spark tries to do Broadcast Hash Join and one of the DataFrames is very large, so sending it consumes much time.

You can:

  1. Set higher spark.sql.broadcastTimeout to increase timeout - spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
  2. persist() both DataFrames, then Spark will use Shuffle Join - reference from here

PySpark

In PySpark, you can set the config when you build the spark context in the following manner:

spark = SparkSession
  .builder
  .appName("Your App")
  .config("spark.sql.broadcastTimeout", "36000")
  .getOrCreate()

Upvotes: 142

Jacek Laskowski
Jacek Laskowski

Reputation: 74719

Just to add some code context to the very concise answer from @T. Gawęda.


In your Spark application, Spark SQL did choose a broadcast hash join for the join because "libriFirstTable50Plus3DF has 766,151 records" which happened to be less than the so-called broadcast threshold (defaults to 10MB).

You can control the broadcast threshold using spark.sql.autoBroadcastJoinThreshold configuration property.

spark.sql.autoBroadcastJoinThreshold Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run.

You can find that particular type of join in the stack trace:

org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)

BroadcastHashJoin physical operator in Spark SQL uses a broadcast variable to distribute the smaller dataset to Spark executors (rather than shipping a copy of it with every task).

If you used explain to review the physical query plan you'd notice the query uses BroadcastExchangeExec physical operator. This is where you can see the underlying machinery for broadcasting the smaller table (and the timeout).

override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
  ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
}

doExecuteBroadcast is part of SparkPlan contract that every physical operator in Spark SQL follows that allows for broadcasting if needed. BroadcastExchangeExec happens to need it.

The timeout parameter is what you are looking for.

private val timeout: Duration = {
  val timeoutValue = sqlContext.conf.broadcastTimeout
  if (timeoutValue < 0) {
    Duration.Inf
  } else {
    timeoutValue.seconds
  }
}

As you can see you can disable it completely (using a negative value) that would imply to wait for the broadcast variable to be shipped to executors indefinitely or use sqlContext.conf.broadcastTimeout which is exactly spark.sql.broadcastTimeout configuration property. The default value is 5 * 60 seconds which you can see in the stacktrace:

java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]

Upvotes: 40

Related Questions