Reputation: 107
I am trying to pull statistics from my elastic cluster I cannot figure out.
In the end what I want to achieve is a count of streams (field: status) over time (field: timestamp) for a specific item (field: media).
The data are logs from nginx with anonymized IPs (field: ip_hash) and user agents (field: http_user_agent). To get a valid count I need to sum up the bytes transferred (field: bytes_sent) and compare that to a minimum threshold (integer) considering the same IP and user agent. It is only a valid stream / only counts if XY bytes of that stream have been transferred in sum.
"_source": {
"media": "my-stream.001",
"http_user_agent": "Spotify/8.4.44 Android/29 (SM-T535)",
"ip_hash": "fcd2653c44c1d8e33ef5d58ac5a33c2599b68f05d55270a8946166373d79a8212a49f75bcf3f71a62b9c71d3206c6343430a9ebec9c062a0b308a48838161ce8",
"timestamp": "2022-02-05 01:32:23.941",
"bytes_sent": 4893480,
"status": 206
}
Where I am having trouble is to sum up the transferred bytes based on the unique user agent / IP hash combination and comparing that to the threshold.
Any pointers are appreciated how I could solve this. Thank you!
So far I got this:
GET /logdata_*/_search
{
"size": 0,
"query": {
"bool": {
"must": [
{
"range": {
"timestamp": {
"gte": "now-1w/d",
"lt": "now/d"
}
}
}
]
}
},
"aggs": {
"status206":{
"filter": {
"term": {
"status": "206"
}
},
"aggs": {
"medias": {
"terms": {
"field": "media",
"size": 10
},
"aggs": {
"ips": {
"terms": {
"field": "ip_hash",
"size": 10
},
"aggs": {
"clients": {
"terms": {
"field": "http_user_agent",
"size": 10
},
"aggs": {
"transferred": {
"sum": {
"field": "bytes_sent"
}
}
}
}
}
}
}
}
}
}
}
}
Which gives something like this:
{
"took" : 1563,
"timed_out" : false,
"_shards" : {
"total" : 12,
"successful" : 12,
"skipped" : 8,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 10000,
"relation" : "gte"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"status206" : {
"doc_count" : 1307130,
"medias" : {
"doc_count_error_upper_bound" : 7612,
"sum_other_doc_count" : 1163149,
"buckets" : [
{
"key" : "20220402_ETD_Podcast_2234_Eliten_-_VD_Hanson.mp3",
"doc_count" : 21772,
"ips" : {
"doc_count_error_upper_bound" : 12,
"sum_other_doc_count" : 21574,
"buckets" : [
{
"key" : "ae55a10beda61afd3641fe2a6ca8470262d5a0c07040d3b9b8285ea1a4dba661a0502a7974dc5a4fecbfbbe5b7c81544cdcea126271533e724feb3d7750913a5",
"doc_count" : 38,
"clients" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "Deezer/7.0.0.xxx (Android; 10; Mobile; de) samsung SM-G960F",
"doc_count" : 38,
"transferred" : {
"value" : 7582635.0
}
}
]
}
},
{
"key" : "60082e96eb57c4a8b7962dc623ef7446fbc08cea676e75c4ff94ab5324dec93a6db1848d45f6dcc6e7acbcb700bb891cf6bee66e1aa98fc228107104176734ff",
"doc_count" : 37,
"clients" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "Deezer/7.0.0.xxx (Android; 12; Mobile; de) samsung SM-N770F",
"doc_count" : 36,
"transferred" : {
"value" : 7252448.0
}
},
{
"key" : "Mozilla/5.0 (Linux; Android 11; RMX2063) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.58 Mobile Safari/537.36",
"doc_count" : 1,
"transferred" : {
"value" : 843367.0
}
}
]
}
},
Now I would need to check that "transferred" is gte the treshhold and that would count as 1 stream. In the end I need the count of all applicable streams.
Upvotes: 4
Views: 1236
Reputation: 198
You can try the following:
> GET _search?filter_path=aggregations.valid_streams.count
{
"size": 0,
"query": {
"bool": {
"must": [
{
"range": {
"timestamp": {
"gte": "now-1w/d",
"lt": "now/d"
}
}
},
{
"match": {
"status": "206"
}
}
]
}
},
"aggs": {
"streams": {
"multi_terms": {
"size": "65536",
"terms": [
{
"field": "media"
},
{
"field": "ip_hash"
},
{
"field": "http_user_agent"
}
]
},
"aggs": {
"transferred": {
"sum": {
"field": "bytes_sent"
}
},
"threshold": {
"bucket_selector": {
"buckets_path": {
"total": "transferred"
},
"script": "params.total > 12345"
}
}
}
},
"valid_streams": {
"stats_bucket": {
"buckets_path": "streams>transferred"
}
}
}
}
Explanation:
streams
- Combined terms aggregations since every changed field in it should be counted as a new stream. This is mainly for better readability, change it if it doesn't fit your logic.transferred
- sum
aggregation to sum up the sent bytes.threshold
- bucket_selector
aggregation which filters out the streams that didn't reach the XY threshold.valid_streams
- stats_bucket
aggregation which returns a count
field containing the amount of buckets = valid streams. BTW, it also gives you info about your valid streams (i.e average bytes)filter_path
queryparam is used to reduce the returned response to only include the aggregation output.Upvotes: 1