Tsar Bomba
Tsar Bomba

Reputation: 1106

Apache Spark simple join results in cryptic error

I have two Datasets which I'm able to query and show() individually. One has 17 records and another has 3.

Dataset<Row> attReader = spark
    .read()
    .format("org.apache.spark.sql.cassandra")
    .option("table", "table_1")
    .load();

Dataset<Row> surReader = spark
    .read()
    .format("org.apache.spark.sql.cassandra")
    .option("table", "table_2")
    .load();

When I attempt to join and show them like so:

    Dataset<Row> joined = attReader.join(surReader,
        attReader.col("key_field").equalTo(surReader.col("key_field")), "inner");
    joined.show();

I'm sure these fields are correct since I can show the data for the individual Datasets and see them. Join fields are strings.

I get the following exception, which doesn't offer much help:

org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
    at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:367)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:135)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:232)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:102)
    at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
    at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:85)
    at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:206)
    at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
    at org.apache.spark.sql.execution.RowDataSourceScanExec.consume(DataSourceScanExec.scala:77)
    at org.apache.spark.sql.execution.RowDataSourceScanExec.doProduce(DataSourceScanExec.scala:125)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.RowDataSourceScanExec.produce(DataSourceScanExec.scala:77)
    at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:125)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:85)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:97)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:39)
    at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
    at com.kilonova.CassandraStream.sparkSql(CassandraStream.java:111)
    at com.kilonova.CassandraStream.init(CassandraStream.java:174)
    at com.kilonova.Main.runStream(Main.java:20)
    at com.kilonova.Main.main(Main.java:14)
Caused by: java.lang.IllegalArgumentException
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:449)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:432)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:432)
    at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:262)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:261)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:261)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2073)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:304)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
    at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)

Upvotes: 0

Views: 713

Answers (2)

Ramdev Sharma
Ramdev Sharma

Reputation: 1014

This might happening due to JDK version. You might not be using JDK 8.

Please check this thread for more information.

This thread might help. Spark Java IllegalArgumentException at org.apache.xbean.asm5.ClassReader

Upvotes: 1

Constantine
Constantine

Reputation: 1406

Spark tries to broadcast one of the smaller tables in the join if its size is less than the value specified by the spark.sql.autoBroadcastJoinThreshold. This property is set to 10 MB by default.

One of the reasons why you would get this error is because of the broadcast timeout. This property is controlled by the spark.sql.broadcastTimeout and is set to 300 seconds by default.

To resolve your issue, you can either do one of the following

  1. Disable the broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1. But this might impact query performance.
  2. Increase the timeout value if you want to broadcast the table.

I have seen cases where Spark doesn't have table statistics and it might try to broadcast the bigger of the two tables in the join. This would degrade the performance. You can identify this issue by getting the query plan and finding out which table is getting broadcasted.

dataframe.queryExecution.sparkPlan

If you find out that spark is broadcasting the bigger table, then you should calculate statistics for the table for better query optimization. You can go with one of the following ways:

  1. Run analyze table command through Spark. If you have a Hive/Impala service running using the same metastore as Spark, you can run Analyze table through Hive or Compute Statistics through Impala.

  2. You can cache the table before you do a join on it. This way Spark calculates the stats for the table on the fly.

For further reference on analyze table statistics :

Let me know if it helped you.

Upvotes: 1

Related Questions