OpenHaus
OpenHaus

Reputation: 107

Elasticsearch: Tricky aggregation with sum and comparison

I am trying to pull statistics from my elastic cluster I cannot figure out.

In the end what I want to achieve is a count of streams (field: status) over time (field: timestamp) for a specific item (field: media).

The data are logs from nginx with anonymized IPs (field: ip_hash) and user agents (field: http_user_agent). To get a valid count I need to sum up the bytes transferred (field: bytes_sent) and compare that to a minimum threshold (integer) considering the same IP and user agent. It is only a valid stream / only counts if XY bytes of that stream have been transferred in sum.

"_source": {
    "media": "my-stream.001",
    "http_user_agent": "Spotify/8.4.44 Android/29 (SM-T535)",
    "ip_hash": "fcd2653c44c1d8e33ef5d58ac5a33c2599b68f05d55270a8946166373d79a8212a49f75bcf3f71a62b9c71d3206c6343430a9ebec9c062a0b308a48838161ce8",
    "timestamp": "2022-02-05 01:32:23.941",
    "bytes_sent": 4893480,
    "status": 206
}

Where I am having trouble is to sum up the transferred bytes based on the unique user agent / IP hash combination and comparing that to the threshold.

Any pointers are appreciated how I could solve this. Thank you!

So far I got this:

GET /logdata_*/_search
    {
      "size": 0,
      "query": {
        "bool": {
          "must": [
            {
              "range": {
                "timestamp": {
                  "gte": "now-1w/d",
                  "lt": "now/d"
                }
              }
            }
          ]
       }
      },
      "aggs": {
        "status206":{
          "filter": {
            "term": {
              "status": "206"
            }
          },

                "aggs": {
                  "medias": {
                    "terms": {
                      "field": "media",
                      "size": 10
                    }, 
                    "aggs": {
                      "ips": {
                        "terms": {
                          "field": "ip_hash",
                            "size": 10
                          },
                          "aggs": {
                            "clients": {
                              "terms": {
                                "field": "http_user_agent",
                                "size": 10
                              },
                              "aggs": {
                                "transferred": {
                                  "sum": {
                                    "field": "bytes_sent"
                                  }
                                }
                              }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }

Which gives something like this:

{
  "took" : 1563,
  "timed_out" : false,
  "_shards" : {
    "total" : 12,
    "successful" : 12,
    "skipped" : 8,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "status206" : {
      "doc_count" : 1307130,
      "medias" : {
        "doc_count_error_upper_bound" : 7612,
        "sum_other_doc_count" : 1163149,
        "buckets" : [
          {
            "key" : "20220402_ETD_Podcast_2234_Eliten_-_VD_Hanson.mp3",
            "doc_count" : 21772,
            "ips" : {
              "doc_count_error_upper_bound" : 12,
              "sum_other_doc_count" : 21574,
              "buckets" : [
                {
                  "key" : "ae55a10beda61afd3641fe2a6ca8470262d5a0c07040d3b9b8285ea1a4dba661a0502a7974dc5a4fecbfbbe5b7c81544cdcea126271533e724feb3d7750913a5",
                  "doc_count" : 38,
                  "clients" : {
                    "doc_count_error_upper_bound" : 0,
                    "sum_other_doc_count" : 0,
                    "buckets" : [
                      {
                        "key" : "Deezer/7.0.0.xxx (Android; 10; Mobile; de) samsung SM-G960F",
                        "doc_count" : 38,
                        "transferred" : {
                          "value" : 7582635.0
                        }
                      }
                    ]
                  }
                },
                {
                  "key" : "60082e96eb57c4a8b7962dc623ef7446fbc08cea676e75c4ff94ab5324dec93a6db1848d45f6dcc6e7acbcb700bb891cf6bee66e1aa98fc228107104176734ff",
                  "doc_count" : 37,
                  "clients" : {
                    "doc_count_error_upper_bound" : 0,
                    "sum_other_doc_count" : 0,
                    "buckets" : [
                      {
                        "key" : "Deezer/7.0.0.xxx (Android; 12; Mobile; de) samsung SM-N770F",
                        "doc_count" : 36,
                        "transferred" : {
                          "value" : 7252448.0
                        }
                      },
                      {
                        "key" : "Mozilla/5.0 (Linux; Android 11; RMX2063) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.58 Mobile Safari/537.36",
                        "doc_count" : 1,
                        "transferred" : {
                          "value" : 843367.0
                        }
                      }
                    ]
                  }
                },
          

Now I would need to check that "transferred" is gte the treshhold and that would count as 1 stream. In the end I need the count of all applicable streams.

Upvotes: 4

Views: 1236

Answers (1)

YoavBZ
YoavBZ

Reputation: 198

You can try the following:

> GET _search?filter_path=aggregations.valid_streams.count
{
    "size": 0,
    "query": {
        "bool": {
            "must": [
                {
                    "range": {
                        "timestamp": {
                            "gte": "now-1w/d",
                            "lt": "now/d"
                        }
                    }
                },
                {
                    "match": {
                        "status": "206"
                    }
                }
            ]
        }
    },
    "aggs": {
        "streams": {
            "multi_terms": {
                "size": "65536",
                "terms": [
                    {
                        "field": "media"
                    },
                    {
                        "field": "ip_hash"
                    },
                    {
                        "field": "http_user_agent"
                    }
                ]
            },
            "aggs": {
                "transferred": {
                    "sum": {
                        "field": "bytes_sent"
                    }
                },
                "threshold": {
                    "bucket_selector": {
                        "buckets_path": {
                            "total": "transferred"
                        },
                        "script": "params.total > 12345"
                    }
                }
            }
        },
        "valid_streams": {
            "stats_bucket": {
                "buckets_path": "streams>transferred"
            }
        }
    }
}

Explanation:

  1. streams - Combined terms aggregations since every changed field in it should be counted as a new stream. This is mainly for better readability, change it if it doesn't fit your logic.
  2. transferred - sum aggregation to sum up the sent bytes.
  3. threshold - bucket_selector aggregation which filters out the streams that didn't reach the XY threshold.
  4. valid_streams - stats_bucket aggregation which returns a count field containing the amount of buckets = valid streams. BTW, it also gives you info about your valid streams (i.e average bytes)
  5. The filter_path queryparam is used to reduce the returned response to only include the aggregation output.

Upvotes: 1

Related Questions