Stolas
Stolas

Reputation: 289

Sub-records in Avro with Morphlines

I'm trying to convert JSON into Avro using the kite-sdk morphline module. After playing around I'm able to convert the JSON into Avro using a simple schema (no complex data types).

Then I took it one step further and modified the Avro schema as displayed below (subrec.avsc). As you can see the schema consist of a subrecord.

As soon as I tried to convert the JSON to Avro using the morphlines.conf and the subrec.avsc it failed.

Somehow the JSON paths "/record_type[]/alert/action" are not translated by the toAvro function.

The morphlines.conf

morphlines : [
   {
   id : morphline1
   importCommands : ["org.kitesdk.**"]

   commands : [
      # Read the JSON blob
      { readJson: {} }

      { logError { format : "record: {}", args : ["@{}"] } }

      # Extract JSON
      { extractJsonPaths { flatten: false, paths: {
              "/record_type[]/alert/action" : /alert/action,
              "/record_type[]/alert/signature_id" : /alert/signature_id,
              "/record_type[]/alert/signature" : /alert/signature,
              "/record_type[]/alert/category" : /alert/category,
              "/record_type[]/alert/severity" : /alert/severity
      } } }

      { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }

      { extractJsonPaths { flatten: false, paths: {
              timestamp : /timestamp,
              event_type : /event_type,
              source_ip : /src_ip,
              source_port : /src_port,
              destination_ip : /dest_ip,
              destination_port : /dest_port,
              protocol : /proto,
      } } }

      # Create Avro according to schema
      { logError { format : "WE GO TO AVRO"} }

      { toAvro { schemaFile : /etc/flume/conf/conf.empty/subrec.avsc } }

      # Create Avro container
      { logError { format : "WE GO TO BINARY"} }
      { writeAvroToByteArray { format: containerlessBinary } }

      { logError { format : "DONE!!!"} }
   ]
   }
]

And the subrec.avsc

{
  "type" : "record",
  "name" : "Event",
  "fields" : [ {
    "name" : "timestamp",
    "type" : "string"
  }, {
    "name" : "event_type",
    "type" : "string"
  }, {
    "name" : "source_ip",
    "type" : "string"
  }, {
    "name" : "source_port",
    "type" : "int"
  }, {
    "name" : "destination_ip",
    "type" : "string"
  }, {
    "name" : "destination_port",
    "type" : "int"
  }, {
    "name" : "protocol",
    "type" : "string"
  }, {
    "name": "record_type",
    "type" : ["null", {
      "name" : "alert",
      "type" : "record",
      "fields" : [ {
            "name" : "action",
            "type" : "string"
        }, {
            "name" : "signature_id",
            "type" : "int"
        }, {
            "name" : "signature",
            "type" : "string"
        }, {
            "name" : "category",
            "type" : "string"
        }, {
            "name" : "severity",
            "type" : "int"
        }
      ] } ]
  } ]
}

The output on { logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } } I output the following:

[{
    /record_type[]/alert / action = [allowed], 
    /record_type[]/alert / category = [],
    /record_type[]/alert / severity = [3],
    /record_type[]/alert / signature = [GeoIP from NL,
    Netherlands],
    /record_type[]/alert / signature_id = [88006],
    _attachment_body = [{
            "timestamp": "2015-03-23T07:42:01.303046",
            "event_type": "alert",
            "src_ip": "1.1.1.1",
            "src_port": 18192,
            "dest_ip": "46.231.41.166",
            "dest_port": 62004,
            "proto": "TCP",
            "alert": {
                "action": "allowed",
                "gid": "1",
                "signature_id": "88006",
                "rev": "1",
                "signature" : "GeoIP from NL, Netherlands ",
                "category" : ""
                "severity" : "3"
                }
            }], 
    _attachment_mimetype=[json/java + memory],
    basename = [simple_eve.json]
}]

Upvotes: 1

Views: 1011

Answers (1)

davy jones
davy jones

Reputation: 11

UPDATE 2017-06-22

you MUST populate the data in the structure in order for this to work, by using addValues or setValues

{
    addValues {
        micDefaultHeader : [
            {
                eventTimestampString : "2017-06-22 18:18:36"
            }
        ]
    }
}

after debugging the sources of morphline toAvro, it appears that the record is the first object to be evaluated, no matter what you put in your mappings structure.

the solution is quite simple, but unfortunately took a little extra time, eclipse, running the flume agent in debug mode, cloning the source code and lots of coffee.

here it goes.

my schema:

{
  "type" : "record",
  "name" : "co_lowbalance_event",
  "namespace" : "co.tigo.billing.cboss.lowBalance",
  "fields" : [ {
    "name" : "dummyValue",
    "type" : "string",
    "default" : "dummy"
  }, {
    "name" : "micDefaultHeader",
    "type" : {
      "type" : "record",
      "name" : "mic_default_header_v_1_0",
      "namespace" : "com.millicom.schemas.root.struct",
      "doc" : "standard millicom header definition",
      "fields" : [ {
        "name" : "eventTimestampString",
        "type" : "string",
        "default" : "12345678910"
      } ]
    }
  } ]
}

morphlines file:

morphlines : [
        {
                id : convertJsonToAvro
                importCommands : ["org.kitesdk.**"]
                commands : [
                        {
                                readJson {
                                        outputClass : java.util.Map
                                }
                        }

                        {
                                addValues {
                                   micDefaultHeader : [{}]
                                }
                        }


                        {
                                logDebug { format : "my record: {}", args : ["@{}"] } 
                        }


                        {
                                toAvro {
                                        schemaFile : /home/asarubbi/Development/test/co_lowbalance_event.avsc
                                        mappings : {
                                                "micDefaultHeader" : micDefaultHeader
                                                "micDefaultHeader/eventTimestampString" : eventTimestampString
                                        }

                                }
                        }


                        {
                                writeAvroToByteArray {
                                        format : containerlessJSON
                                        codec : null
                                }
                        }
                ]
        }
]

the magic lies here:

{
   addValues {
      micDefaultHeader : [{}]
   }
}

and in the mappings:

mappings : {
    "micDefaultHeader" : micDefaultHeader
    "micDefaultHeader/eventTimestampString" : eventTimestampString
}

explanation:

inside the code the first field name that is evaluated is micDefaultHeader of type RECORD. as there's no way to specify a default value for a RECORD (logically correct), the toAvro code evaluates this, does not get any value configured in mappings and therefore it fails at it detects (wrongly) that the record is empty when it shouldn't.

however, taking a look at the code, you may see that it requires a Map object, containing no values to please the parser and continue to the next element.

so we add a map object using the addValues and fill it with an empty map [{}]. notice that this must match the name of the record that is causing you an empty value. in my case "micDefaultHeader"

feel free to comment if you have a better solution, as this looks like a "dirty fix"

Upvotes: 1

Related Questions