Reputation: 395
I am trying to join two different CSV fles which have a common column as below.
Csv1: (Generating the flowfile )
Emp_Id,Name,Address,Mobile_No
1,Name1,Add1,Mob1 2,Name2,Add2,Mob2
Csv2: (Given in the CSVRecordLookupService config as the Lookup CSV)
Emp_Id,Salary,Department
1,10k,dev
2,20k,mn
Outputneeded:
Emp_Id,Name,Address,Mobile_No,Salary,Department
1,Name1,Add1,Mob1,10k,dev
2,Name2,Add2,Mob2,20k,mng
My configuraton of the LookupRecord processor was based on description here: https://gist.github.com/ijokarumawak/b9c95a0d0c86c97ffeaeb5ef95320b8b
But when I execute the flow, I am seeing the below error from the logs:
2020-07-15 19:04:01,603 ERROR [Timer-Driven Process Thread-8] o.a.n.processors.standard.LookupRecord LookupRecord[id=538b171d-0173-1000-fc2e-b228d34dfc53] Failed to process StandardFlowFileRecord[uuid=d32f8354-5849-4c07-b8c3-a00f0bc5abe3,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1594828212922-466, container=default, section=466], offset=214923, length=65],offset=0,name=d32f8354-5849-4c07-b8c3-a00f0bc5abe3,size=65]: org.apache.nifi.processor.exception.ProcessException: Failed to lookup coordinates {key=1} in Lookup Service
org.apache.nifi.processor.exception.ProcessException: Failed to lookup coordinates {key=1} in Lookup Service
at org.apache.nifi.processors.standard.LookupRecord.doResultPathReplacement(LookupRecord.java:395)
at org.apache.nifi.processors.standard.LookupRecord.route(LookupRecord.java:303)
at org.apache.nifi.processors.standard.LookupRecord.route(LookupRecord.java:68)
at org.apache.nifi.processors.standard.AbstractRouteRecord$1.process(AbstractRouteRecord.java:134)
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2324)
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2292)
at org.apache.nifi.processors.standard.AbstractRouteRecord.onTrigger(AbstractRouteRecord.java:121)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
at org.apache.nifi.lookup.CSVRecordLookupService.lookup(CSVRecordLookupService.java:234)
at org.apache.nifi.lookup.LookupService.lookup(LookupService.java:48)
at sun.reflect.GeneratedMethodAccessor613.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:87)
at com.sun.proxy.$Proxy227.lookup(Unknown Source)
at org.apache.nifi.processors.standard.LookupRecord.doResultPathReplacement(LookupRecord.java:393)
... 18 common frames omitted
What I tried so far:
Updated Avroschema used in the CSVRecordWriter
(added the needed fields to the schema)
{ "type": "record", "namespace": "nifi", "name": "JoinedEmp", "fields": [ { "name": "Emp_Id", "type": "string" }, { "name": "Name", "type": "string" }, { "name": "Address", "type": "string" }, { "name": "Mobile_No", "type": "string" }, { "name": "Salary", "type": "string" }, { "name": "Department", "type": "string" } ] }
Updated Result Record Path to /Emp_Id
Both did not work.
Looked at different places in the web for LookupRecord config and examples but could not set right the error that I was seeing here.
Can any one help me here what config is I am setting wrongly here.
Thanks in Advance.
Note: Updated my question (removed clutter) based on the suggestions received below)
Upvotes: 1
Views: 1270
Reputation: 12083
The lookup service is expecting the value of the lookup key to be a string, but you're giving it an integer. It's a bit annoying but the key's value has to be cast to a String, so try the following for the value of your key
property:
toString( /Emp_Id, "UTF-8")
Upvotes: 1