Pankaj Ramteke
Pankaj Ramteke

Reputation: 1

Morphline config file not indexing avro nexted data

I am generating index for my avro data in solr. Index are only getting generated for data elements which are at root level and not which are nested. Below is the sample schema (not including all of it)

My Avro Schema is as below.

{
  "type" : "record",
  "name" : "abcd",
  "namespace" : "xyz",
  "doc" : "Schema Definition for Low Fare Search Shopping Request/Response Data",
  "fields" : [ {
    "name" : "ShopID",
    "type" : "string"
  }, {
    "name" : "RqSysTimestamp",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "RqTimestamp",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "RsSysTimestamp",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "RsTimestamp",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "Request",
    "type" : {
      "type" : "record",
      "name" : "RequestStruct",
      "fields" : [ {
        "name" : "TransactionID",
        "type" : [ "string", "null" ]
      }, {
        "name" : "AgentSine",
        "type" : [ "string", "null" ]
      }, {
        "name" : "CabinPref",
        "type" : [ {
          "type" : "array",
          "items" : {
            "type" : "record",
            "name" : "CabinStruct",
            "fields" : [ {
              "name" : "Cabin",
              "type" : [ "string", "null" ]
            }, {
              "name" : "PrefLevel",
              "type" : [ "string", "null" ]
            } ]
          }
        }, "null" ]
      }, {
        "name" : "CountryCode",
        "type" : [ "string", "null" ]
      }, 
        "name" : "PassengerStatus",
        "type" : [ "string", "null" ]
      }, {
}

How do i refer "TransactionID" in my morphline config file. I tried all options but it does not generate index for data elements which are nested.

Below is the sample of my morphline config file.

extractAvroPaths {
          flatten : true
          paths : { 
        ShopID : /ShopID
                RqSysTimestamp : /RqSysTimestamp
                RqTimestamp : /RqTimestamp
                RsSysTimestamp :/RsSysTimestamp
                RsTimestamp : /RsTimestamp
                TransactionID : "/Request/RequestStruct/TransactionID"
                AgentSine : "/Request/RequestStruct/AgentSine"
                Cabin :/Cabin
                PrefLevel :/PrefLevel
                CountryCode :/CountryCode
                FrequentFlyerStatus :/FrequentFlyerStatus

Upvotes: 0

Views: 193

Answers (1)

tom lee
tom lee

Reputation: 1

The toAvro command expects a java.util.Map as input on conversion to a nested Avro record. So this is my solution.

morphlines: [
  {
    id: convertJsonToAvro
    importCommands: [ "org.kitesdk.**" ]
    commands: [
      # read the JSON blob
      { readJson: {} }
      
      # java code
      {
              java { 
                    imports : """
                      import com.fasterxml.jackson.databind.JsonNode;
                      import com.fasterxml.jackson.databind.ObjectMapper;
                      import org.kitesdk.morphline.base.Fields;
                      import java.io.IOException;
                      import java.util.Set;
                      import java.util.ArrayList;
                      import java.util.Iterator;
                      import java.util.List;
                      import java.util.Map;
                    """

                    code : """
                      String jsonStr = record.getFirstValue(Fields.ATTACHMENT_BODY).toString();
                      ObjectMapper mapper = new ObjectMapper();
                      Map<String, Object> map = null;
                      try {
                          map = (Map<String, Object>)mapper.readValue(jsonStr, Map.class);
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                      Set<String> keySet = map.keySet();
                      for (String o : keySet) {
                          record.put(o, map.get(o));
                      }
                      return child.process(record);                   
                    """

              }
      }               
      
      # convert the extracted fields to an avro object
      # described by the schema in this field
      { toAvro {
        schemaFile: /etc/flume/conf/a1/like_user_event_realtime.avsc
      } }
      
      #{ logInfo { format : "loginfo: {}", args : ["@{}"] } }
  
      # serialize the object as avro
      { writeAvroToByteArray: {
        format: containerlessBinary
      } }
  
    ]
  }
]

Upvotes: 0

Related Questions