likeGreen
likeGreen

Reputation: 1079

Modify csv based on a column value in Nifi by adding a new line

I have csv file of form

Id, Name, Class
1, Kevin,[Eight, Nine]
2, Mark,Four

How can I create a new csv as follows

Id, Name, Class
1, Kevin,Eight
1, Kevin,Nine
2, Mark,Four

Basically, If column Class has array of string then put it in multiple rows replicating all other column values. I am able to remove [ ] parenthesis using replaceText and replace with empty string. So, basically i have following csv if it helps.

Id, Name, Class
    1, Kevin, Eight, Nine
    2, Mark,Four

Upvotes: 1

Views: 660

Answers (2)

likeGreen
likeGreen

Reputation: 1079

I used ExecuteGroovyScript to split the string in [Eight, Nine] by ,. Once this string is split then I append remaining before and after contents. Then I use ReplaceText to remove [ and ].

def flowFile = session.get()
if(!flowFile) return
try {
flowFile = session.write(flowFile, {inputStream, outputStream ->
    outputStream.withWriter("UTF-8"){ w ->
        inputStream.eachLine("UTF-8"){ line ->
         def splitArray = new String[0];
         def subString = "";
         def x = line.indexOf("[")+1;
         def y = line.indexOf("]");
         if(x > 0 && y >0)
         subString = line.substring(x,y);
         if(subString != null && subString.length() >0)
             splitArray = subString.split(',')
             if(splitArray.length > 1) {
                 def lineBefore = line.substring(0,x);
                 def lineAfter = line.substring(y,line.length());
                for(int i=0;i<splitArray.length;i++) {
                    w << lineBefore << splitArray.getAt(i) << lineAfter << '\n'
                }
            }else {
                w << line << '\n'
            }
        }
    }
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)
}catch(e) {
      log.error('Error capturing nextLink', e)
      session.transfer(flowFile, REL_FAILURE)
}

Upvotes: 0

Rony
Rony

Reputation: 173

  • The CSV file is not in correct format. So, we need to correct it by enclosing the array element with double quotes (as the field has comma in it). Here we can use ReplaceText processor to replace '[' with '"[' and ']' with ']"'. I used here two ReplaceText processors.

enter image description here

The output is now like this:

Id,Name,Class
1,Kevin,"[Eight,Nine]"
2,Mark,Four
  • Next we convert the CSV data into JSON. I used here ConvertRecord processor.

Output:

[ {
    "Id" : 1,
    "Name" : "Kevin",
    "Class" : "[Eight,Nine]"
}, {
    "Id" : 2,
    "Name" : "Mark",
    "Class" : "Four"
} ]
  • We can pass this JSON to an ExecuteScript processor to split the "Class" array. Here is the ECMAScript that I used:

Code:

var flowFile = session.get();
if (flowFile != null) {

    var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback")
    var IOUtils = Java.type("org.apache.commons.io.IOUtils")
    var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")

    flowFile = session.write(flowFile,
        new StreamCallback(function(inputStream, outputStream) {
            var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
            var json = JSON.parse(text);
            
            // Output array
            var newArray = [];

            for (index in json) {
                obj = json[index];
                
                // if the Class has '[' at the beginning
                if (obj['Class'].indexOf('[') == 0) {
                    var rightBracket = obj['Class'].indexOf(']');

                    // Get the value of Class without brackets
                    var classValue = obj['Class'].substring(1, rightBracket);

                    // We split the value with comma
                    var values = classValue.split(',');

                    // We push each value of the class in the output array
                    for (var i in values) {
                        newArray.push({
                            "Id": obj['Id'],
                            "Name": obj['Name'],
                            "Class": values[i]
                        });
                    }
                } else {
                    // Normal entry, Class is not an array
                    newArray.push(obj);
                }
            }

            outputStream.write(JSON.stringify(newArray, null, '\t').getBytes(StandardCharsets.UTF_8));
        }
    ));
    session.transfer(flowFile, REL_SUCCESS);
}

Output:

[
    {
        "Id": 1,
        "Name": "Kevin",
        "Class": "Eight"
    },
    {
        "Id": 1,
        "Name": "Kevin",
        "Class": "Nine"
    },
    {
        "Id": 2,
        "Name": "Mark",
        "Class": "Four"
    }
]
  • Now we can convert this JSON to a CSV file by using another ConvertRecord processor.

Output:

Id,Name,Class
1,Kevin,Eight
1,Kevin,Nine
2,Mark,Four

Upvotes: 1

Related Questions