Reputation: 59
See example scenario:
csv file content
john|doe|1
stacy|doe|2
database fields
fname | lname | list_index | raw_text
My objective is to ingest and save CSV file content to database using nifi processor. See sample output inserted in the database below including the record inserted in raw_text column.
fname | lname | list_index | raw_text
john | doe | 1 | "john|doe|1 "
stacy | doe | 2 | "stacy|doe|2"
Upvotes: 1
Views: 809
Reputation: 2878
If you need to do a lot of data customization then you can use ExecuteScript
processor to do data manipulation, pipeline should be something like -
ListFile -> FetchFile -> ExecuteScript -> PutDatabaseRecord
Configure your ExecuteScript
as below,
Script Engine: python
Script Body:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import sys
import os
import datetime
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
with wrap(inputStream) as f:
lines = f.readlines()
updated_lines = []
header_line = 'fname|lname|list_index|raw_text' + '\n'
updated_lines.append(header_line)
for line in lines:
updated_line = line.strip() + '|"' + line.strip() + '"' + '\n'
updated_lines.append(updated_line)
with wrap(outputStream, 'w') as filehandle:
filehandle.writelines("%s" % line for line in updated_lines)
flow_file = session.get()
if flow_file:
try:
session.write(flow_file, PyStreamCallback())
session.transfer(flow_file, ExecuteScript.REL_SUCCESS)
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
excp = str(exc_type) + str(fname)+ str(exc_tb.tb_lineno)
attrMap = {'exception': str(excp)}
flow_file = session.putAllAttributes(flow_file, attrMap)
session.transfer(flow_file, ExecuteScript.REL_FAILURE)
Configure PutDatabaseRecord
accordingly.
Upvotes: 1