Reputation: 99
I have the following code in which I am inserting data to table txnaggr_rt_fact
which is having 2 columns are partitioned txninterval
and intervaltype
. I have enabled dynamic partition in spark sql. If the partition already exists there is no problem.
The data is getting inserted to the table but if the partition doesn't exist then an exception arises but if the partition already exists then there is no problem.
SparkSession spark = SparkSession.builder().appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", "hdfs://localhost:8020/user/hive/warehouse")
.config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport().getOrCreate();
spark.sql("use nadb");
spark.sql("show tables").show();
spark.sql("insert into table txnaggr_rt_fact partition(txninterval='2018-09-03',intervaltype='test') values('1','2','3',4)"); //(Line number 113) Exception raises here as partition doesn't exist
This is the exception:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
java.lang.NullPointerException: null;
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)
at org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:843)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:249)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
at com.cw.na.spark.HiveSqlTest.main(HiveSqlTest.java:113)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles(Hive.java:3412)
at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1650)
at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1579)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.sql.hive.client.Shim_v0_14.loadPartition(HiveShim.scala:836)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply$mcV$sp(HiveClientImpl.scala:741)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply(HiveClientImpl.scala:739)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply(HiveClientImpl.scala:739)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:272)
at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:210)
at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:209)
at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:255)
at org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:739)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply$mcV$sp(HiveExternalCatalog.scala:855)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply(HiveExternalCatalog.scala:843)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply(HiveExternalCatalog.scala:843)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
... 25 more
I am using hive metastore
service as well by using hive --service metastore
I have the following properties in the spark conf folder of hive-site.xml
<property>
<name>hive.exec.dynamic.partition</name>
<value>true</value>
<description>Whether or not to allow dynamic partitions in DML/DDL.</description>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
<description>
In strict mode, the user must specify at least one static partition
in case the user accidentally overwrites all partitions.
In nonstrict mode all partitions are allowed to be dynamic.
</description>
</property>
This property hive.exec.dynamic.partition.mode
is set to strict
and after realization I have changed it to nonstrict
. I didn't restart spark afterwards but I have stopped the metastore and started it. Do I need to restart spark as well. What else am I missing here in my code?
Following is the schema of the txnaggr_rt_fact:
channelid string
chaincodeid string
chaincodefcn string
count int
txninterval date
intervaltype string
# Partition Information
# col_name data_type comment
txninterval date
intervaltype string
Need help. Thanks
Upvotes: 0
Views: 857
Reputation: 3544
In case if you're sure about the reason being the absense of inserted partition, you may issue the following query prior to inserting your data:
alter table add if not exists partition (txninterval=<value>, intervaltype=<value>)
Upvotes: 1