Reputation: 478
I am creating a dataframe using Apache Spark version 2.3.1. When I try to count a dataframe I get following error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 12, analitik11.{hostname}, executor 1): java.lang.ArrayIndexOutOfBoundsException: 2
at org.apache.spark.sql.vectorized.ColumnarBatch.column(ColumnarBatch.java:98)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.datasourcev2scan_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
... 49 elided
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
at org.apache.spark.sql.vectorized.ColumnarBatch.column(ColumnarBatch.java:98)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.datasourcev2scan_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
We use com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder
to connect to and read tables from Hive. Code for generating the dataframe is as follows:
val hive = com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder.session(spark).build()
val edgesTest = hive.executeQuery("select trim(s_vno) as src ,trim(a_vno) as dst, share, administrator, account, all_share " +
"from ebyn.babs_edges_2018 where (share <> 0 or administrator <> 0 or account <> 0 or all_share <> 0) and trim(date) = '201801'")
val share_org_edges = edgesTest.alias("df1").
join(edgesTest.alias("df2"), "src").
where("df1.dst <> df2.dst").
groupBy(
greatest("df1.dst", "df2.dst").as("src"),
least("df1.dst", "df2.dst").as("dst")).
agg(max("df1.share").as("share"), max("df1.administrator").as("administrator"), max("df1.account").as("account"), max("df1.all_share").as("all_share")).persist
share_org_edges.count
Table properties are as follows:
CREATE TABLE `EBYN.BABS_EDGES_2018`(
`date` string,
`a_vno` string,
`s_vno` string,
`amount` double,
`num` int,
`share` int,
`share_ratio` int,
`administrator` int,
`account` int,
`share-all` int)
COMMENT 'Imported by sqoop on 2018/10/11 11:10:16'
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='',
'line.delim'='\n',
'serialization.format'='')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://ggmprod/warehouse/tablespace/managed/hive/ebyn.db/babs_edges_2018'
TBLPROPERTIES (
'bucketing_version'='2',
'transactional'='true',
'transactional_properties'='insert_only',
'transient_lastDdlTime'='1539245438')
Upvotes: 2
Views: 1752
Reputation: 1
I faced the same problem and even after disabling data pruning / pushdown, it doesn't work..
The documentation is found here https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.4/integrating-hive/content/hive-read-write-operations.html under Pruning and Pushdowns
in Python I set: spark.conf.set('spark.datasource.hive.warehouse.disable.pruning.and.pushdowns', 'true')
But this doesnt work. Instead I have found a solution/workaround which is to persist one of the tables (which was identified to be faulty).
df1 = df.filter(xx).join(xx).persist()
I guess from the documentation, spark does project pushdown to find the parent dataframe - this error occurs when joining df of the same dataframe, can someone explain it?
Also, let me know if it works
Upvotes: 0
Reputation: 336
Problem
The edgesTest
is a dataframe with a logical plan containing a unique DataSourceV2Relation
node. This DataSourceV2Relation
logical plan node contains a mutable HiveWarehouseDataSourceReader
that will be used to read the Hive table.
The edgesTest
dataframe is used two times : as df1
and as df2
.
During Spark logical plan optimization, column pruning happened two times on the same HiveWarehouseDataSourceReader
mutable instance. The second column pruning overwriting the first one by setting its own required columns.
During execution the reader will fire two times the same query to Hive warehouse with the columns required by the second column pruning. The Spark generated code will not find the expected columns from Hive query result.
Solutions
Spark 2.4
DataSourceV2
has been improved, especially by SPARK-23203 DataSourceV2 should use immutable trees
Spark 2.3
Disable column pruning in the HiveWarehouseConnector
datasource reader.
Hortonworks has already fixed this issue, as stated by the HDP 3.1.5 Release Notes.
We can find the correction in its HiveWarehouseConnector github repository :
if (useSpark23xReader) {
LOG.info("Using reader HiveWarehouseDataSourceReaderForSpark23x with column pruning disabled");
return new HiveWarehouseDataSourceReaderForSpark23x(params);
} else if (disablePruningPushdown) {
LOG.info("Using reader HiveWarehouseDataSourceReader with column pruning and filter pushdown disabled");
return new HiveWarehouseDataSourceReader(params);
} else {
LOG.info("Using reader PrunedFilteredHiveWarehouseDataSourceReader");
return new PrunedFilteredHiveWarehouseDataSourceReader(params);
}
Also, the HDP 3.1.5 Hive integration doc specify :
To prevent data correctness issues in this release, pruning and projection pushdown is disabled by default.
...
To prevent these issues and ensure correct results, do not enable pruning and pushdowns.
Upvotes: 1