dablumino
dablumino

Reputation: 1

ElasticSearch - Get uniqe query results by id together with cardinality aggregation

Hello I would like to archive the following, we have a data pool there we store a lot of duplicated items (see a sample below) and i need to query all latest entry's by an distinct id, after wards i need to group the results by state to get a count of the occurences.

I've trying the following query with the result that I'll get items with each state but it should be only the newest item.

e.g with the sample data it should be only 2 and not 3 entries.

{
  "size": 0,
  "sort": [
    { "timestamp": { "order": "desc" } }
  ],
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "projectName": "XYZ"
          }
        }
      ]
     
    }
  },
  "aggs": {
    "categories": {
      "terms": {
        "field" : "state", "size": 50
      }
    },
    "aggs": {
      "item_count": {
         "cardinality": {
            "field": "id"
          }
        }
      }
  }
}

The data structure look like the following:

{
    "state": "Implemented",
    "priority": "Low",
    "severity": "Minor",
    "id": 9898,
    "timestamp": "2024-10-01T00:01:12.881358+00:00",
    "projectName": "XYZ"
},
{
    "state": "Closed",
    "priority": "Low",
    "severity": "Minor",
    "id": 9898,
    "timestamp": "2024-10-08T00:01:12.881358+00:00",
    "projectName": "XYZ"
},
{
    "state": "Implemented",
    "priority": "Low",
    "severity": "Minor",
    "id": 999,
    "timestamp": "2024-10-01T00:01:12.881358+00:00",
    "projectName": "XYZ"
},

Kind regards

Upvotes: 0

Views: 29

Answers (1)

G0l0s
G0l0s

Reputation: 496

I couldn’t find a non-scripted way. Therefore I use scripted_metric

Your sample documents

PUT /unique_aggregation/_bulk
{"create":{"_id":1}}
{"state":"Implemented","priority":"Low","severity":"Minor","id":9898,"timestamp":"2024-10-01T00:01:12.881358+00:00","projectName":"XYZ"}
{"create":{"_id":2}}
{"state":"Closed","priority":"Low","severity":"Minor","id":9898,"timestamp":"2024-10-08T00:01:12.881358+00:00","projectName":"XYZ"}
{"create":{"_id":3}}
{"state":"Implemented","priority":"Low","severity":"Minor","id":999,"timestamp":"2024-10-01T00:01:12.881358+00:00","projectName":"XYZ"}

Query

GET /unique_aggregation/_search?filter_path=aggregations
{
    "query": {
        "bool": {
            "must": [
                {
                    "match": {
                        "projectName": "XYZ"
                    }
                }
            ]
        }
    },
    "aggs": {
        "last_id_by_states": {
            "scripted_metric": {
                "init_script": """
                    state.items = new HashMap()
                """,
                "map_script": """
                    String id = doc.id.value.toString();
                    if (state.items.containsKey(id)) {
                        Map docMap = state.items.get(id);
                        if (docMap.timestamp.isAfter(doc.timestamp.value)) {
                            return;
                        }
                    }
                    Map docMap = new HashMap();
                    docMap.state = doc['state.keyword'].value;
                    docMap.timestamp = doc.timestamp.value;
                    state.items.put(id, docMap);
                    """,
                "combine_script": "return state.items",
                "reduce_script": """
                    Map totalItems = new HashMap();
                    for (items in states) {
                        totalItems.putAll(items);
                    }
                    Map stateMap = totalItems.values().stream().collect(Collectors.groupingBy(k -> k.state, Collectors.counting()));
                    return stateMap;
                """
            }
        }
    }
}

In the map_script stage a document is added to a map. The map has a structure items[id]=Map[state][timestamp]

Please test the combine_script stage. My cluster is single-sharded

In the reduce_script stage the map converted into a new map by a stream. The new map has structure stateMap[state] = docCount

Response

{
    "aggregations" : {
        "last_id_by_states" : {
            "value" : {
                "Implemented" : 1,
                "Closed" : 1
            }
        }
    }
}

Upvotes: 0

Related Questions