Reputation: 3782
I misunderstand the performance of PySpark in the following example.
I have several DataFrame's and I join them consequently.
print"users_data"
print users_data.show()
print"calc"
print calc.show()
print"users_cat_data"
print users_cat_data.show()
data1 = calc.join(users_data, ['category_pk','item_pk'], 'leftouter')
print "DATA1"
print data1.show()
data2 = data1.join(users_cat_data, ['category_pk'], 'leftouter')
print "DATA2"
print data2.show()
data3 = data2.join(category_data, ['category_pk'], 'leftouter')
print "DATA3"
print data3.show()
data4 = data3.join(clicks_data, ['category_pk','item_pk'], 'leftouter')
print "DATA4"
print data4.show()
data4.write.parquet(output + '/test.parquet', mode="overwrite")
I expect that leftouter
joining will return the left side DataFrame with matchings (if any) from the right side DataFrame.
Soma sample outputs:
users_data
+--------------+----------+-------------------------+
| category_pk| item_pk| unique_users|
+--------------+----------+-------------------------+
| 321| 460| 1|
| 730| 740| 2|
| 140| 720| 10|
users_cat_data
+--------------+-----------------------+
| category_pk| unique_users_per_cat|
+--------------+-----------------------+
| 111| 258|
| 100| 260|
| 750| 9|
However, I observe a different behaviour. I used show()
to print out the first 5 rows of all DataFrame's that I use in joining operations. ALL DataFrames contain data. But I get the following error:
None
DATA1
Traceback (most recent call last):
File "mytest.py", line 884, in <module>
args.field1, args.field2, args.field3)
File "mytest.py", line 802, in calc
print data1.show()
File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/pyspark.zip/pyspark/sql/dataframe.py", line 336, in show
File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o802.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
Caused by: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:794)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793)
I do not understand why I get Task Serialization error at line print data1.show()
. The DataFrame's used to create ata1
are not empty. Also, show()
is successfully used 2 lines above this line of code.
Sometimes it fails at the last line data4.write.parquet(output + '/test.parquet', mode="overwrite")
and when I delete it, it runs well. But now it fails even earlier on the line data1.show()
.
How to solve this problem. Any help will be really appreciated.
Upvotes: 2
Views: 5434
Reputation: 74779
I think the reason for the top-most org.apache.spark.SparkException: Exception thrown in awaitResult
is that while requesting BroadcastExchangeExec
physical operator for broadcasting a relation (aka table) it simply timed out (after the default 5 minutes of waiting until it finishes).
That's the low-level background on the meaning of the exception.
Now, you may be asking yourself, why could that be happening in the first place?
Set spark.sql.broadcastTimeout
as -1
to disable the timeout completely (that will cause the thread to wait for the broadcast to finish indefinitely) or increase it to 10 or so minutes.
You could also disable broadcasting a table by setting spark.sql.autoBroadcastJoinThreshold
to -1
.
That however would just work around a more serious issue that happens with your environment.
My guess is that your YARN cluster (guessing by /mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001
) is tight on resources and the network may be sluggish too.
All in all, my guess is that some tables in your queries are lower than the default 10MB that causes Spark SQL optimizer to choose broadcasting (over other means of distributing datasets over executors).
I think that there's something more serious going on in the cluster and you're facing some temporary problems until...the admins fix the YARN cluster. Could the cluster be under more load when you submitted the PySpark application?
I do not understand why I get Task Serialization error
I think you can simply disregard it as a side effect of the earlier issue(s) given how PySpark works under the covers with two processes (i.e. Python and a JVM) communicating over a socket.
Upvotes: 5