Reputation: 1581
I try to update some DB column in Oracle Database with Nifi.
I have such part of the circuit:
I have problem with last PutDatabaserecord
:
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | 2021-07-12 17:34:20,919 ERROR [Timer-Driven Process Thread-2] o.a.n.p.standard.PutDatabaseRecord PutDatabaseRecord[id=017a10b4-fe2c-1b89-f752-67545ebb8406] Failed to put Records to database for StandardFlowFileRecord[uuid=db4a0554-6ea8-4cca-b1da-11bdacef1ccc,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1625839380170-6543, container=default, section=399], offset=292967, length=68],offset=0,name=f4d1875f-bec4-4944-ad8e-7955048148f3,size=68]. Routing to failure.: java.sql.BatchUpdateException: ORA-01747: invalid user.table.column, table.column, or column specification
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain |
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | java.sql.BatchUpdateException: ORA-01747: invalid user.table.column, table.column, or column specification
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain |
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:10032)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1364)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9839)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:234)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at sun.reflect.GeneratedMethodAccessor156.invoke(Unknown Source)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.lang.reflect.Method.invoke(Method.java:498)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at com.sun.proxy.$Proxy149.executeBatch(Unknown Source)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.executeDML(PutDatabaseRecord.java:754)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.putToDatabase(PutDatabaseRecord.java:841)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.onTrigger(PutDatabaseRecord.java:487)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain | at java.lang.Thread.run(Thread.java:748)
This is configuration of the problematic node:
This is a schema of RecordReader:
{
"name": "load_date",
"type": "record",
"namespace": "maxi",
"fields": [
{
"name": "doc_id",
"type": "int"
},
{
"name": "line_id",
"type": "int"
},
{
"name": "load_date",
"type": "string"
}
]
}
And this is a sample of the json data coming to the node:
[{"doc_id":1795576199,"line_id":689617855,"load_date":"2021-34-12"}]
UPDATE
OK, I set PutDatabaseRecord to Debug mode, evaluated sigle record? for catch full debug info from the processor. This is the head of the log exactly as the processor starts to handle the record:
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | 2021-07-13 14:15:23,951 INFO [NiFi Web Server-456] o.a.n.c.s.StandardProcessScheduler Starting SplitJson[id=9f810155-017a-1000-890c-4b1e382a161e]
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | 2021-07-13 14:15:23,951 INFO [NiFi Web Server-456] o.a.n.controller.StandardProcessorNode Starting SplitJson[id=9f810155-017a-1000-890c-4b1e382a161e]
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | 2021-07-13 14:15:23,964 INFO [Timer-Driven Process Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled SplitJson[id=9f810155-017a-1000-890c-4b1e382a161e] to run with 1 threads
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | 2021-07-13 14:15:23,976 ERROR [Timer-Driven Process Thread-1] o.a.n.p.standard.PutDatabaseRecord PutDatabaseRecord[id=017a10b4-fe2c-1b89-f752-67545ebb8406] Failed to put Records to database for StandardFlowFileRecord[uuid=9c53fd8b-775b-42a6-bcae-fb4208a40985,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1626174735665-61, container=default, section=61], offset=776075, length=330],offset=0,name=51510f97-7a54-4394-966a-12775653acb0,size=66]. Routing to failure.: java.sql.SQLDataException: Cannot map field 'doc_id' to any column in the database
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | Columns:
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | java.sql.SQLDataException: Cannot map field 'doc_id' to any column in the database
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | Columns:
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.generateUpdate(PutDatabaseRecord.java:1073)
(I added one another processor there - SplitJson
)
And, I added some table in my own scheme on the DB.
This is the DDL:
CREATE TABLE psheom.ml_task (
doc_id number,
line_id number,
load_date DATE,
CONSTRAINT pk_ml_task PRIMARY KEY(doc_id, line_id)
)
And this is result of DESCRIBE psheom.ml_task
:
Name Null? Type
--------- -------- ------
DOC_ID NOT NULL NUMBER
LINE_ID NOT NULL NUMBER
LOAD_DATE DATE
UPDATE
I try to make ExecuteSql processor as @pmdba suggests, but I get some error. Here is configuration:
And here is error:
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | 2021-07-13 14:57:19,137 ERROR [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.ExecuteSQL ExecuteSQL[id=9fb97e49-017a-1000-bae9-256f569c239b] Unable to execute SQL select query describe psheom.ml_task due to java.sql.SQLSyntaxErrorException: ORA-00900: invalid SQL statement
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | . No FlowFile to route to failure: java.sql.SQLSyntaxErrorException: ORA-00900: invalid SQL statement
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain |
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | java.sql.SQLSyntaxErrorException: ORA-00900: invalid SQL statement
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain |
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain | at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:494)
UPDATE
I set Translate Field Names
to false
, but it is same result.
Upvotes: 0
Views: 808
Reputation: 1581
OK, so it is quite a "few things" that we need to do for UPDATE
some record in Oracle
.
First of all DO NOT use DBeaver! It is very glitchy! Use just SQLDeveloper for working with Oracle.
Second, create session with same user configured for connection pool in Nifi side, and run DESCRIBE
for table you need to update:
DESCRIBE GOODS.ML_TASK
Be sure, it succeeds!
Third, we need upper case all entity names, and it is good to set Translate Field Names
to false
, thus it removes underscores, ie.: doc_id -> docid
.
Firth, you need json
with upper case field names in the processor's input:
[
{"DOC_ID":1799041400,"LINE_ID":694098344,"LOAD_DATE":"14-Jul-21"},
{"DOC_ID":1802019315,"LINE_ID":697885808,"LOAD_DATE":"14-Jul-21"}
]
If you have lower case field in some place, use UpdateAttribute
processor with reader and writer where reader will upper case the fields:
The schema defined for PutDatabaseRecord reader MUST be upper cased:
{
"name": "load_date",
"type": "record",
"namespace": "maxi",
"fields": [
{
"name": "DOC_ID",
"type": "int"
},
{
"name": "LINE_ID",
"type": "int"
},
{
"name": "LOAD_DATE",
"type": "string"
}
]
}
So here you have:
UpdateRecord
:
PutDatanaseRecord
All of them use THE SAME JSON SCHEMA
Fifth, as you seen, use string
type for date.
You can put date as number of milliseconds sine THE DATE, but, despite the fact it worked in my own schema, it didn't worked in the production schema. So you need to query NLS_DATE_FORMAT
.
Put the appropriate query in ExecuteSQL
processor:
select * from nls_session_parameters where parameter = 'NLS_DATE_FORMAT'
You will have the format in the result queue:
[{"PARAMETER": "NLS_DATE_FORMAT", "VALUE": "DD-MON-RR"}]
Use it for make date formatted in the appropriate way.
Sixth, as we see, put the right date format in the "Expression Language" expression in the right place, for instance, in UpdateRecord
in my case:
Seventh, make sure you have just single or few records passing threw the queues, for be able to monitor the log. Also put the PutDatabaseRecord
in DEBUG
mode:
In such a way, it will inform you, whether it succeed to fetch the schema from DB.
So... I wish you happy Oracle's table update!
Upvotes: 2