Markus
Markus

Reputation: 3782

Why does this PySpark join fail?

I misunderstand the performance of PySpark in the following example.

I have several DataFrame's and I join them consequently.

print"users_data"
print users_data.show()
print"calc"
print calc.show()
print"users_cat_data"
print users_cat_data.show()

data1 = calc.join(users_data, ['category_pk','item_pk'], 'leftouter')
print "DATA1"
print data1.show()
data2 = data1.join(users_cat_data, ['category_pk'], 'leftouter')
print "DATA2"
print data2.show()
data3 = data2.join(category_data, ['category_pk'], 'leftouter')
print "DATA3"
print data3.show()
data4 = data3.join(clicks_data, ['category_pk','item_pk'], 'leftouter')
print "DATA4"
print data4.show()

data4.write.parquet(output + '/test.parquet', mode="overwrite")

I expect that leftouter joining will return the left side DataFrame with matchings (if any) from the right side DataFrame.

Soma sample outputs:

users_data
+--------------+----------+-------------------------+
|   category_pk|   item_pk|             unique_users|
+--------------+----------+-------------------------+
|           321|       460|                        1|
|           730|       740|                        2|
|           140|       720|                       10|


users_cat_data
+--------------+-----------------------+
|   category_pk|   unique_users_per_cat|
+--------------+-----------------------+
|           111|                    258|
|           100|                    260|
|           750|                      9|

However, I observe a different behaviour. I used show() to print out the first 5 rows of all DataFrame's that I use in joining operations. ALL DataFrames contain data. But I get the following error:

None
DATA1
Traceback (most recent call last):
  File "mytest.py", line 884, in <module>
    args.field1, args.field2, args.field3)
  File "mytest.py", line 802, in calc
    print data1.show()
  File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/pyspark.zip/pyspark/sql/dataframe.py", line 336, in show
  File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o802.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
        at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 

Caused by: org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:794)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793)

I do not understand why I get Task Serialization error at line print data1.show(). The DataFrame's used to create ata1 are not empty. Also, show() is successfully used 2 lines above this line of code.

Sometimes it fails at the last line data4.write.parquet(output + '/test.parquet', mode="overwrite") and when I delete it, it runs well. But now it fails even earlier on the line data1.show().

How to solve this problem. Any help will be really appreciated.

Upvotes: 2

Views: 5434

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74779

I think the reason for the top-most org.apache.spark.SparkException: Exception thrown in awaitResult is that while requesting BroadcastExchangeExec physical operator for broadcasting a relation (aka table) it simply timed out (after the default 5 minutes of waiting until it finishes).

That's the low-level background on the meaning of the exception.

Now, you may be asking yourself, why could that be happening in the first place?

Set spark.sql.broadcastTimeout as -1 to disable the timeout completely (that will cause the thread to wait for the broadcast to finish indefinitely) or increase it to 10 or so minutes.

You could also disable broadcasting a table by setting spark.sql.autoBroadcastJoinThreshold to -1.

That however would just work around a more serious issue that happens with your environment.

My guess is that your YARN cluster (guessing by /mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001) is tight on resources and the network may be sluggish too.

All in all, my guess is that some tables in your queries are lower than the default 10MB that causes Spark SQL optimizer to choose broadcasting (over other means of distributing datasets over executors).

I think that there's something more serious going on in the cluster and you're facing some temporary problems until...the admins fix the YARN cluster. Could the cluster be under more load when you submitted the PySpark application?

I do not understand why I get Task Serialization error

I think you can simply disregard it as a side effect of the earlier issue(s) given how PySpark works under the covers with two processes (i.e. Python and a JVM) communicating over a socket.

Upvotes: 5

Related Questions