Gowtham Kesa
Gowtham Kesa

Reputation: 99

Java spark to hive table insertion to dynamic partition exception

I have the following code in which I am inserting data to table txnaggr_rt_fact which is having 2 columns are partitioned txninterval and intervaltype. I have enabled dynamic partition in spark sql. If the partition already exists there is no problem.

The data is getting inserted to the table but if the partition doesn't exist then an exception arises but if the partition already exists then there is no problem.

SparkSession spark = SparkSession.builder().appName("Java Spark Hive Example")
                .config("spark.sql.warehouse.dir", "hdfs://localhost:8020/user/hive/warehouse")
                .config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict")
                .enableHiveSupport().getOrCreate();
        spark.sql("use nadb");
        spark.sql("show tables").show();
        spark.sql("insert into table txnaggr_rt_fact partition(txninterval='2018-09-03',intervaltype='test') values('1','2','3',4)"); //(Line number 113) Exception raises here as partition doesn't exist

This is the exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: 

java.lang.NullPointerException: null;
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)
        at org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:843)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:249)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
        at com.cw.na.spark.HiveSqlTest.main(HiveSqlTest.java:113)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
        at org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles(Hive.java:3412)
        at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1650)
        at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1579)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.sql.hive.client.Shim_v0_14.loadPartition(HiveShim.scala:836)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply$mcV$sp(HiveClientImpl.scala:741)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply(HiveClientImpl.scala:739)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply(HiveClientImpl.scala:739)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:272)
        at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:210)
        at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:209)
        at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:255)
        at org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:739)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply$mcV$sp(HiveExternalCatalog.scala:855)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply(HiveExternalCatalog.scala:843)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply(HiveExternalCatalog.scala:843)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
        ... 25 more

I am using hive metastore service as well by using hive --service metastore

I have the following properties in the spark conf folder of hive-site.xml

  <property>
    <name>hive.exec.dynamic.partition</name>
    <value>true</value>
    <description>Whether or not to allow dynamic partitions in DML/DDL.</description>
  </property>
  <property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>nonstrict</value>
    <description>
      In strict mode, the user must specify at least one static partition
      in case the user accidentally overwrites all partitions.
      In nonstrict mode all partitions are allowed to be dynamic.
    </description>
  </property>

This property hive.exec.dynamic.partition.mode is set to strict and after realization I have changed it to nonstrict. I didn't restart spark afterwards but I have stopped the metastore and started it. Do I need to restart spark as well. What else am I missing here in my code?

Following is the schema of the txnaggr_rt_fact:

channelid               string
chaincodeid             string
chaincodefcn            string
count                   int
txninterval             date
intervaltype            string

# Partition Information
# col_name              data_type               comment

txninterval             date
intervaltype            string

Need help. Thanks

Upvotes: 0

Views: 857

Answers (1)

mangusta
mangusta

Reputation: 3544

In case if you're sure about the reason being the absense of inserted partition, you may issue the following query prior to inserting your data:

alter table add if not exists partition (txninterval=<value>, intervaltype=<value>)

Upvotes: 1

Related Questions