bentoi
bentoi

Reputation: 59

How can I get the csv file line content and save it to the database using NiFi processors?

See example scenario:

csv file content

john|doe|1
stacy|doe|2

database fields

fname | lname | list_index | raw_text

My objective is to ingest and save CSV file content to database using nifi processor. See sample output inserted in the database below including the record inserted in raw_text column.

 fname | lname | list_index | raw_text
  john | doe   |     1      | "john|doe|1 " 
 stacy | doe   |     2      | "stacy|doe|2"

Upvotes: 1

Views: 809

Answers (1)

Vikramsinh Shinde
Vikramsinh Shinde

Reputation: 2878

If you need to do a lot of data customization then you can use ExecuteScript processor to do data manipulation, pipeline should be something like -

ListFile -> FetchFile -> ExecuteScript -> PutDatabaseRecord

Configure your ExecuteScript as below,

Script Engine: python

Script Body:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import sys
import os
import datetime


class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):

        with wrap(inputStream) as f:
            lines = f.readlines()
            updated_lines = []
            header_line = 'fname|lname|list_index|raw_text' + '\n'
            updated_lines.append(header_line)
            for line in lines:
                updated_line = line.strip() + '|"' + line.strip() + '"' + '\n'
                updated_lines.append(updated_line)

            with wrap(outputStream, 'w') as filehandle:
                filehandle.writelines("%s" % line for line in updated_lines)

flow_file = session.get()

if flow_file:
    try:
        session.write(flow_file, PyStreamCallback())
        session.transfer(flow_file, ExecuteScript.REL_SUCCESS)

    except Exception as e:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        excp = str(exc_type) + str(fname)+ str(exc_tb.tb_lineno)
        attrMap = {'exception': str(excp)}
        flow_file = session.putAllAttributes(flow_file, attrMap)
        session.transfer(flow_file, ExecuteScript.REL_FAILURE)

Configure PutDatabaseRecord accordingly.

Upvotes: 1

Related Questions