Holm
Holm

Reputation: 3365

PutHiveStreaming Processor in Nifi throws NPE

I'm debugging a HiveProcessor which follows the official PutHiveStreaming processor, but it writes to Hive 2.x instead of 3.x. The flow runs in Nifi cluster 1.7.1. Although this exception happens, data is still written to Hive.

The exception is:


java.lang.NullPointerException: null
    at org.apache.hadoop.hive.ql.security.authorization.plugin.AuthorizationMetaStoreFilterHook.getFilteredObjects(AuthorizationMetaStoreFilterHook.java:77)
    at org.apache.hadoop.hive.ql.security.authorization.plugin.AuthorizationMetaStoreFilterHook.filterDatabases(AuthorizationMetaStoreFilterHook.java:54)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDatabases(HiveMetaStoreClient.java:1147)
    at org.apache.hive.hcatalog.common.HiveClientCache$CacheableHiveMetaStoreClient.isOpen(HiveClientCache.java:471)
    at sun.reflect.GeneratedMethodAccessor1641.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:169)
    at com.sun.proxy.$Proxy308.isOpen(Unknown Source)
    at org.apache.hive.hcatalog.common.HiveClientCache.get(HiveClientCache.java:270)
    at org.apache.hive.hcatalog.common.HCatUtil.getHiveMetastoreClient(HCatUtil.java:558)
    at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:95)
    at org.apache.hive.hcatalog.streaming.StrictJsonWriter.<init>(StrictJsonWriter.java:82)
    at org.apache.hive.hcatalog.streaming.StrictJsonWriter.<init>(StrictJsonWriter.java:60)
    at org.apache.nifi.util.hive.HiveWriter.lambda$getRecordWriter$0(HiveWriter.java:91)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.nifi.util.hive.HiveWriter.getRecordWriter(HiveWriter.java:91)
    at org.apache.nifi.util.hive.HiveWriter.<init>(HiveWriter.java:75)
    at org.apache.nifi.util.hive.HiveUtils.makeHiveWriter(HiveUtils.java:46)
    at org.apache.nifi.processors.hive.PutHive2Streaming.makeHiveWriter(PutHive2Streaming.java:1152)
    at org.apache.nifi.processors.hive.PutHive2Streaming.getOrCreateWriter(PutHive2Streaming.java:1065)
    at org.apache.nifi.processors.hive.PutHive2Streaming.access$1000(PutHive2Streaming.java:114)
    at org.apache.nifi.processors.hive.PutHive2Streaming$1.lambda$process$2(PutHive2Streaming.java:858)
    at org.apache.nifi.processor.util.pattern.ExceptionHandler.execute(ExceptionHandler.java:127)
    at org.apache.nifi.processors.hive.PutHive2Streaming$1.process(PutHive2Streaming.java:855)
    at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2211)
    at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2179)
    at org.apache.nifi.processors.hive.PutHive2Streaming.onTrigger(PutHive2Streaming.java:808)
    at org.apache.nifi.processors.hive.PutHive2Streaming.lambda$onTrigger$4(PutHive2Streaming.java:672)
    at org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114)
    at org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
    at org.apache.nifi.processors.hive.PutHive2Streaming.onTrigger(PutHive2Streaming.java:672)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
    at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

I also like to re-produce the error. Would using TestRunners.newTestRunner(processor); be able to find it? I refer to the test case for Hive 3.x https://github.com/apache/nifi/blob/ea9b0db2f620526c8dd0db595cf8b44c3ef835be/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java

The other way is to run Hive 2.x and Nifi container locally. But then I have to run docker cp to copy the nar package by mvn, and attach remote JVM from intellij as this blog describes. https://community.hortonworks.com/articles/106931/nifi-debugging-tutorial.html

Have someone done similar? or is there an easier way to debug a custom processor?

Upvotes: 0

Views: 657

Answers (2)

Holm
Holm

Reputation: 3365

NPE doesn't show up after hcatalog.hive.client.cache.disabled set to true

Kafka Connect recommends this setting, too.

from Kafka Connect Doc https://docs.confluent.io/3.0.0/connect/connect-hdfs/docs/hdfs_connector.html

As connector tasks are long running, the connections to Hive metastore are kept open until tasks are stopped. In the default Hive configuration, reconnecting to Hive metastore creates a new connection. When the number of tasks is large, it is possible that the retries can cause the number of open connections to exceed the max allowed connections in the operating system. Thus it is recommended to set hcatalog.hive.client.cache.disabled to true in hive.xml.

When Max Concurrent Tasks of PutHiveStreaming is set more than 1, this property is automatically set as false

Also the document from Nifi resolved the issue already.

The NiFi PutHiveStreaming has a pool of connections, therefore multithreaded; Setting hcatalog.hive.client.cache.disabled to true would allow each connection to set is own Session without relying on the cache.

ref: https://community.hortonworks.com/content/supportkb/196628/hive-client-puthivestreaming-fails-against-partiti.html

Upvotes: 0

mattyb
mattyb

Reputation: 12083

This is a red herring error, there's some issue on the Hive side where it can't get its own IP address or hostname, and issues this error periodically as a result. However I don't think it causes any real problems, as you said the data gets written to Hive.

Just for completeness, in Apache NiFi PutHiveStreaming is built to work against Hive 1.2.x, not Hive 2.x. There are currently no specific Hive 2.x processors, we've never determined whether the Hive 1.2.x processors work against Hive 2.x.

For debugging, if you can run Hive in a container and expose the metastore port (9083 is the default I believe), then you should be able to create an integration test using things like TestRunners and run NiFi locally from your IDE. This is how other integration tests are performed for external systems such as MongoDB or Elasticsearch for example.

There is a MiniHS2 class in the Hive test suite for integration testing, but it is not in a published artifact so unfortunately we're left with having to run tests against a real Hive instance.

Upvotes: 1

Related Questions