Dario Lurido
Dario Lurido

Reputation: 98

Python script to compare the data types of a CSV in Apache NiFi

I have created this Python script to compare the data types of a CSV coming from the InputStream with an array of data types, but I don't understand why I'm getting an error in line number 7 in NiFi. Does anyone know what I'm doing wrong?

from org.apache.nifi.processors.script import ExecuteScript
import csv
import io
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import InputStreamCallback
from java.nio.charset import StandardCharsets

class PyInputStreamCallback(InputStreamCallback):
    def __init__(self):
        pass
    
    def process(self, inputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        # Esegui la validazione
        expected_data_types = ['int', 'str', 'str', 'str', 'str']  # array di type attesi
    
        def is_valid_data_type(value, data_type):
            if data_type == 'int':
                try:
                    int(value)
                    return True
                except ValueError:
                    return False
            elif data_type == 'str':
                try:
                    str(value)
                    return True
                except ValueError:
                    return False
            elif data_type == 'float':
                try:
                    float(value)
                    return True
                except ValueError:
                    return False
            else:
                return False
        
        def validate_csv(text, expected_data_types):
            reader = csv.reader(io.StringIO(text))
            next(reader)
            for row in reader:
                for i, value in enumerate(row):
                    if not is_valid_data_type(value, expected_data_types[i]):
                        error_message = "Invalid data type at row {}, column {}: {}".format(reader.line_num, i + 1, value)
                        flowFile = session.putAttribute(flowFile, 'csv.validation.error', error_message)
                        session.transfer(flowFile, ExecuteScript.REL_FAILURE)
                        return
            

        validate_csv(text, expected_data_types)
        session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
    
flowFile = session.get()
if(flowFile != None):
    session.read(flowFile, PyInputStreamCallback())

I would like to read the CSV file and receive an error if the data types between the ones in the array and those in the file are not equal

edit script mod

Upvotes: 0

Views: 139

Answers (0)

Related Questions