carousallie
carousallie

Reputation: 865

Extract Value of CSV Column to Add Attribute

I'm working with some CSVs in NiFi and my pipeline is producing some duplicates. As a result, I'd like to use the DetectDuplicate processor, but in order to do this I need to have some attribute that it can compare against to detect duplication. I have an ExtractText processor, and I'd like to use regex to get the value in the SHA1_BASE16 column.

I tried the following regex string (suggested by a friend, I don't totally understand it) on the CSV below but highlighted the incorrect fields and some extraneous stuff. How can I get it to capture only the SHA1_BASE16 value?

RegEx

^[^,]*,[^,]*,[^,]*,[^,]*,[^,]*,([^,]*),[^,]*,[^,]*,[^,]*,[^,]*,[^,]*,([^,]*),[^,]*,[^,]*,([^,]*)\S*

CSV

"USER_JID","CREATED_AT","UPLOAD_TIME","SHA1_BASE32","SHA1_BASE16","HASH_SOURCE","MESSAGE_TYPE","IPV4"
"dreynolds","1932/04/01 20:23:35 UTC","2016/12/28 20:23:11 UTC","72F20077A79A0D4D90F4C0669FB6EA4BC5953293","FB1D928B83DEBCD2B2E53DF4C8D4C2AF99EB81E0","HOLLYWOOD","TWITTER","123.123.123.123"

Actual Output

Match 1
Full match  0-291   "USER_JID","CREATED_AT","UPLOAD_TIME","SHA1_BASE32","SHA1_BASE16","HASH_SOURCE","MESSAGE_TYPE","IPV4...
Group 1.    66-79   "HASH_SOURCE"
Group 2.    209-251 "FB1D928B83DEBCD2B2E53DF4C8D4C2AF99EB81E0"
Group 3.    274-291 "123.123.123.123"

Expected Output

Match 1
Full match  0-291   "USER_JID","CREATED_AT","UPLOAD_TIME","SHA1_BASE32","SHA1_BASE16","HASH_SOURCE","MESSAGE_TYPE","IPV4...
Group 1.    209-251 "FB1D928B83DEBCD2B2E53DF4C8D4C2AF99EB81E0"

Upvotes: 0

Views: 2037

Answers (2)

mattyb
mattyb

Reputation: 12083

Alternatively, you can use PartitionRecord to split the records into flow files where each record has the same value for the partition field (in this case SHA1_BASE16). It will also set an attribute on the flow file for the partition value, which you can then use in DetectDuplicate.

For high cardinality fields (ones that won't have many duplicates), you may see a performance hit since there could be a single row in each outgoing flow file, so for a large number of rows, you'll get a large number of flow files. Having said that, rather than DetectDuplicate downstream you could instead RouteOnAttribute where record.count > 1. That removes the need for a DistributedMapCache.

There is also a contribution to add a DetectDuplicateRecord processor which I think is what you'd really want here. That contribution is under review and I hope makes it into the next release of NiFi.

Upvotes: 1

Emma
Emma

Reputation: 27723

I'm guessing that we would be having two 40 chars strings here, which we would be using the first one as a left boundary, and apply this simple expression:

.+"[A-Z0-9]{40}",("[A-Z0-9]{40}").+

where our desired output is in this capturing group:

("[A-Z0-9]{40}")

which we can use $1.

Demo

Test

const regex = /.+"[A-Z0-9]{40}",("[A-Z0-9]{40}").+/gm;
const str = `"dreynolds","1932/04/01 20:23:35 UTC","2016/12/28 20:23:11 UTC","72F20077A79A0D4D90F4C0669FB6EA4BC5953293","FB1D928B83DEBCD2B2E53DF4C8D4C2AF99EB81E0","HOLLYWOOD","TWITTER","123.123.123.123"`;
let m;

while ((m = regex.exec(str)) !== null) {
    // This is necessary to avoid infinite loops with zero-width matches
    if (m.index === regex.lastIndex) {
        regex.lastIndex++;
    }
    
    // The result can be accessed through the `m`-variable.
    m.forEach((match, groupIndex) => {
        console.log(`Found match, group ${groupIndex}: ${match}`);
    });
}

RegEx Circuit

jex.im visualizes regular expressions:

enter image description here

Upvotes: 0

Related Questions