Vizag
Vizag

Reputation: 395

NiFi - LookupRecord processor configuration to join multiple columns using two different CSVs failing with error

I am trying to join two different CSV fles which have a common column as below.

Csv1: (Generating the flowfile )

Emp_Id,Name,Address,Mobile_No 
1,Name1,Add1,Mob1 2,Name2,Add2,Mob2

Csv2: (Given in the CSVRecordLookupService config as the Lookup CSV)

Emp_Id,Salary,Department
1,10k,dev
2,20k,mn

Outputneeded:

Emp_Id,Name,Address,Mobile_No,Salary,Department
1,Name1,Add1,Mob1,10k,dev
2,Name2,Add2,Mob2,20k,mng

My configuraton of the LookupRecord processor was based on description here: https://gist.github.com/ijokarumawak/b9c95a0d0c86c97ffeaeb5ef95320b8b

But when I execute the flow, I am seeing the below error from the logs:

2020-07-15 19:04:01,603 ERROR [Timer-Driven Process Thread-8] o.a.n.processors.standard.LookupRecord LookupRecord[id=538b171d-0173-1000-fc2e-b228d34dfc53] Failed to process StandardFlowFileRecord[uuid=d32f8354-5849-4c07-b8c3-a00f0bc5abe3,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1594828212922-466, container=default, section=466], offset=214923, length=65],offset=0,name=d32f8354-5849-4c07-b8c3-a00f0bc5abe3,size=65]: org.apache.nifi.processor.exception.ProcessException: Failed to lookup coordinates {key=1} in Lookup Service
org.apache.nifi.processor.exception.ProcessException: Failed to lookup coordinates {key=1} in Lookup Service
    at org.apache.nifi.processors.standard.LookupRecord.doResultPathReplacement(LookupRecord.java:395)
    at org.apache.nifi.processors.standard.LookupRecord.route(LookupRecord.java:303)
    at org.apache.nifi.processors.standard.LookupRecord.route(LookupRecord.java:68)
    at org.apache.nifi.processors.standard.AbstractRouteRecord$1.process(AbstractRouteRecord.java:134)
    at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2324)
    at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2292)
    at org.apache.nifi.processors.standard.AbstractRouteRecord.onTrigger(AbstractRouteRecord.java:121)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
    at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
    at org.apache.nifi.lookup.CSVRecordLookupService.lookup(CSVRecordLookupService.java:234)
    at org.apache.nifi.lookup.LookupService.lookup(LookupService.java:48)
    at sun.reflect.GeneratedMethodAccessor613.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:87)
    at com.sun.proxy.$Proxy227.lookup(Unknown Source)
    at org.apache.nifi.processors.standard.LookupRecord.doResultPathReplacement(LookupRecord.java:393)
    ... 18 common frames omitted

What I tried so far:

  1. Updated Avroschema used in the CSVRecordWriter(added the needed fields to the schema)

    { "type": "record", "namespace": "nifi", "name": "JoinedEmp", "fields": [ { "name": "Emp_Id", "type": "string" }, { "name": "Name", "type": "string" }, { "name": "Address", "type": "string" }, { "name": "Mobile_No", "type": "string" }, { "name": "Salary", "type": "string" }, { "name": "Department", "type": "string" } ] }

  2. Updated Result Record Path to /Emp_Id

Both did not work.

Looked at different places in the web for LookupRecord config and examples but could not set right the error that I was seeing here.

Can any one help me here what config is I am setting wrongly here.

Thanks in Advance.

Note: Updated my question (removed clutter) based on the suggestions received below)

Upvotes: 1

Views: 1270

Answers (1)

mattyb
mattyb

Reputation: 12083

The lookup service is expecting the value of the lookup key to be a string, but you're giving it an integer. It's a bit annoying but the key's value has to be cast to a String, so try the following for the value of your key property:

toString( /Emp_Id, "UTF-8")

Upvotes: 1

Related Questions