Ragi Dayananda
Ragi Dayananda

Reputation: 25

Nifi MergeRecord Processor to merge null values

I am splitting the list of fields and trying to merge them at the end. I have 2 kind of fields, standard field and custom field. The way I process custom fields is different than standard fields.

{
 "standardfield1" : "fieldValue1",
  "customField1" : "customValue"
}

These has to be translated into

{ 
  "standardfield1" : "fieldValue1",
  "customFields" : [
   { "type" : "customfield",
     "id" : 1212 //this is id of the customField1, retrieved at run time
     "value" :  "customValue"
   } ]
}

My mergeRecord Schema is set to

{
  "name": "custom field",
  "namespace": "nifi",
  "type": "record",
  "fields": [
    { "name": "id", "type": "string" },
    { "name": "type", "type": "string" },
    { "name": "value", "type": "string" }

  ]
}

And as per my need I am setting the content of the standard field to the new flowfile attribute as I can extract it from it, and put the empty value in the flowfile content.

So, both custom fields and standard fields are connected to mergeRecord processor.

it works pretty fine as long as custom fields are available in the payload. If there is only standard fields and no custom fields then mergeRecord processor wont merge any thing and also wont fail, it just throws NullPointerException and there by flowfile stuck in the queue forever.

I want to make mergeRecord processor to merge even the empty content flow files.

Any help would be appreciated

Upvotes: 0

Views: 678

Answers (1)

mattyb
mattyb

Reputation: 12093

I'm not sure I fully understand your use case, but for your input above, if you have extracted/populated the ID for customField1 into an attribute (let's call it myId), then you could use JoltTransformJSON to get your desired output above, using this Chain spec:

[
  {
    "operation": "shift",
    "spec": {
      "standardfield1": "standardfield1",
      "customField*": {
        "@": "customFields.[&(1,1)].value",
        "#customfield": "customFields.[&(1,1)].type",
        "#${myId}": "customFields.[&(1,1)].id"
      }
    }
  },
  {
    "operation": "remove",
    "spec": {
      "customFields": {
        "0": ""
      }
    }
  },
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      "customFields": {
        "*": {
          "id": "=toInteger"
        }
      }
    }
  }
]

This will create the customFields array if there is a customField present, and populate it with the values you have above (including the value of the myId attribute). You can tweak things (like adding a Default spec to the above Chain) to add an empty array for customFields if you wish (to keep the schema happy, e.g.). If I've misunderstood what you're trying to do, please let me know and I will do my best to help.

Upvotes: 1

Related Questions