shiva
shiva

Reputation: 429

Group Data on elastic search with same value on two key

I have just started to learn about elastic search and facing a problem on group aggregation. I have a data set on elastic search like :

[{
    srcIP : "10.0.11.12",
    dstIP : "19.67.78.91",
    totalMB : "0.25"
},{
    srcIP : "10.45.11.62",
    dstIP : "19.67.78.91",
    totalMB : "0.50"
},{
    srcIP : "13.67.52.91",
    dstIP : "10.0.11.12",
    totalMB : "0.75"
},{
    srcIP : "10.23.64.12",
    dstIP : "10.45.11.62",
    totalMB : "0.25"
}]

I Just want to group data on the basis of srcIP and sum the field totalMB but I just wanna add up on more thing like when group by performing on scrIP then it will match the srcIP value to dstIP value and also sum the totalMB for dstIP.

Output should be like this :

buckets : [{
    key : "10.0.11.12",
    total_GB_SrcIp :{
        value : "0.25"
    },
    total_GB_dstIP :{
        value : "0.75"
    }
},
{
    key : "10.45.11.62",
    total_MB_SrcIp :{
        value : "0.50"
    },
    total_MB_dstIP :{
        value : "0.25"
    }
}]

I have done normal aggregation for one key but didn't get the final query for my problem. Query :

GET /index*/_search
{
    size : 0,
    "aggs": {
        "group_by_srcIP": {
          "terms": {
        "field": "srcIP",
        "size": 100,
        "order": {
          "total_MB_SrcIp": "desc"
        }
          },
          "aggs": {
        "total_MB_SrcIp": {
          "sum": {
            "field": "TotalMB"
          }
        }
          }
    }
  }
}

Hope you understand my problem on the basis of sample output. Thanks in advance.

Upvotes: 0

Views: 1297

Answers (1)

ms_27
ms_27

Reputation: 1684

As per my understanding, you need a sum aggregation on field (totalMB) with respect to distinct values in two another fields (srcIP, dstIP).

AFAIK, elastic search is not that good for aggregating on values of multiple fields, unless you combine those fields together using some document ingestion or combine it on application side itself. (I may be wrong here, though).

I gave it a try to get required output using scripted_metric aggregation. (Please read about it if you don't know what it is or how it works)

I experimented on painless script to do following in aggregation:

  1. pick srcIp, dstIp & totalMB from each doc
  2. populate a cross-mapping like IP -> { (src : totalMBs), (dst : totalMBs) } in a map
  3. return this map as result of aggregation

Here is the actual search query with aggregation:

GET /testIndex/testType/_search
{
  "size": 0,
  "aggs": {
    "ip-addr": {
      "scripted_metric": {
        "init_script": "params._agg.addrs = []",
        "map_script": "def lst = []; lst.add(doc.srcIP.value); lst.add(doc.dstIP.value); lst.add(doc.totalMB.value); params._agg.addrs.add(lst);",
        "combine_script": "Map ipMap = new HashMap(); for(entry in params._agg.addrs) { def srcIp = entry.get(0); def dstIp = entry.get(1); def mbs = entry.get(2); if(ipMap.containsKey(srcIp)) {def srcMbSum = mbs + ipMap.get(srcIp).get('srcMB'); ipMap.get(srcIp).put('srcMB',srcMbSum); } else {Map types = new HashMap(); types.put('srcMB', mbs); types.put('dstMB', 0.0); ipMap.put(srcIp, types); } if(ipMap.containsKey(dstIp)) {def dstMbSum = mbs + ipMap.get(dstIp).get('dstMB'); ipMap.get(dstIp).put('dstMB',dstMbSum); } else {Map types = new HashMap(); types.put('srcMB', 0.0); types.put('dstMB', mbs); ipMap.put(dstIp, types); } } return ipMap;",
        "reduce_script": "Map resultMap = new HashMap(); for(ipMap in params._aggs) {for(entry in ipMap.entrySet()) {def ip = entry.getKey(); def srcDestMap = entry.getValue(); if(resultMap.containsKey(ip)) {Map types = new HashMap(); types.put('srcMB', srcDestMap.get('srcMB') + resultMap.get(ip).get('srcMB')); types.put('dstMB', srcDestMap.get('dstMB') + resultMap.get(ip).get('dstMB')); resultMap.put(ip, types); } else {resultMap.put(ip, srcDestMap); } } } return resultMap;"
      }
    }
  }
}

Here are experiment details:

Index mapping:

GET testIndex/_mapping
{
  "testIndex": {
    "mappings": {
      "testType": {
        "dynamic": "true",
        "_all": {
          "enabled": false
        },
        "properties": {
          "dstIP": {
            "type": "ip"
          },
          "srcIP": {
            "type": "ip"
          },
          "totalMB": {
            "type": "double"
          }
        }
      }
    }
  }
}

Sample input:

POST testIndex/testType
{
    "srcIP" : "10.0.11.12",
    "dstIP" : "19.67.78.91",
    "totalMB" : "0.25"
}

POST testIndex/testType
{
    "srcIP" : "10.45.11.62",
    "dstIP" : "19.67.78.91",
    "totalMB" : "0.50"
}

POST testIndex/testType
{
    "srcIP" : "13.67.52.91",
    "dstIP" : "10.0.11.12",
    "totalMB" : "0.75"
}

POST testIndex/testType
{
    "srcIP" : "10.23.64.12",
    "dstIP" : "10.45.11.62",
    "totalMB" : "0.25"
}

Query output:

{
  "took": 3,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 4,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "ip-addr": {
      "value": {
        "13.67.52.91": {
          "srcMB": 0.75,
          "dstMB": 0
        },
        "10.23.64.12": {
          "srcMB": 0.25,
          "dstMB": 0
        },
        "10.45.11.62": {
          "srcMB": 0.5,
          "dstMB": 0.25
        },
        "19.67.78.91": {
          "srcMB": 0,
          "dstMB": 0.75
        },
        "10.0.11.12": {
          "srcMB": 0.25,
          "dstMB": 0.75
        }
      }
    }
  }
}

Here is readable query for better understanding.

"scripted_metric": {
  "init_script": "params._agg.addrs = []",
  "map_script": """
      def lst = [];
      lst.add(doc.srcIP.value);
      lst.add(doc.dstIP.value);
      lst.add(doc.totalMB.value);
      params._agg.addrs.add(lst);
    """,
  "combine_script": """
      Map ipMap = new HashMap();
      for(entry in params._agg.addrs) { 
        def srcIp = entry.get(0);
        def dstIp = entry.get(1);
        def mbs = entry.get(2);

        if(ipMap.containsKey(srcIp)) { 
          def srcMbSum = mbs + ipMap.get(srcIp).get('srcMB');
          ipMap.get(srcIp).put('srcMB',srcMbSum);
        } else {
          Map types = new HashMap();
          types.put('srcMB', mbs);
          types.put('dstMB', 0.0);
          ipMap.put(srcIp, types);
        }

        if(ipMap.containsKey(dstIp)) {
          def dstMbSum = mbs + ipMap.get(dstIp).get('dstMB');
          ipMap.get(dstIp).put('dstMB',dstMbSum);
        } else {
          Map types = new HashMap();
          types.put('srcMB', 0.0);
          types.put('dstMB', mbs);
          ipMap.put(dstIp, types);
        }
      }
      return ipMap;
    """,
  "reduce_script": """
      Map resultMap = new HashMap();
      for(ipMap in params._aggs) {
        for(entry in ipMap.entrySet()) {
          def ip = entry.getKey();
          def srcDestMap = entry.getValue();

          if(resultMap.containsKey(ip)) {
            Map types = new HashMap();
            types.put('srcMB', srcDestMap.get('srcMB') + resultMap.get(ip).get('srcMB'));
            types.put('dstMB', srcDestMap.get('dstMB') + resultMap.get(ip).get('dstMB'));
            resultMap.put(ip, types);
          } else {
            resultMap.put(ip, srcDestMap);
          } 
        } 
      }
      return resultMap;
    """
}

However, prior to going in depth, I would suggest you to test it out on some sample data and check if it works. Scripted metric aggregations do have considerable impact on query performance.

One more thing, to get required key string in aggregation result, replace all occurrences of 'srcMB' & 'dstMB' in script to 'total_GB_SrcIp' & 'total_GB_DstIp' as per your need.

Hope this may help you or some one.

FYI, I tested this on ES v5.6.11.

Upvotes: 3

Related Questions