Bhavesh Patel
Bhavesh Patel

Reputation: 25

How to deduplicate and perform aggregations using single Elastic search query?

I have an index where employee details data is stored.

I have feedback field per employee integer values (0-10).

I want to get the count of feedback, avg rating of the feedbacks and avg rating per employee of the feedback.

The problem here is:

So I have two or more same documents (duplicate) in an ES index (using employee id and one feedback identifier, we can distinguish the records),

I want to do avg and count on some fields with only one document to be considered using ES query only

PS: We cannot remove duplications from the index.

Upvotes: 2

Views: 2878

Answers (2)

jaspreet chahal
jaspreet chahal

Reputation: 9099

Data:

"hits" : [
      {
        "_index" : "index22",
        "_type" : "_doc",
        "_id" : "r_QurHEBvLUX24hJph0B",
        "_score" : 1.0,
        "_source" : {
          "empId" : 1,
          "feedbackId" : 1,
          "feedback" : 2
        }
      },
      {
        "_index" : "index22",
        "_type" : "_doc",
        "_id" : "sPQurHEBvLUX24hJ0R3x",
        "_score" : 1.0,
        "_source" : {
          "empId" : 1,
          "feedbackId" : 1,
          "feedback" : 2
        }
      },
      {
        "_index" : "index22",
        "_type" : "_doc",
        "_id" : "sfQurHEBvLUX24hJ5h16",
        "_score" : 1.0,
        "_source" : {
          "empId" : 1,
          "feedbackId" : 2,
          "feedback" : 6
        }
      }
    ]

I have taken one employee with id 1 and three feedback (two duplicate for identifierId 1 and one for identifierId2). Total sum is 10 and using distinct sum is 8

Query:

I am using scripted metric aggregation to create a dictionary of unique feedbackIdentifierI and feedback value.

  1. "init_script":

Executed prior to any collection of documents. Allows the aggregation to set up any initial state.

Have declared a hashtable transactions"

  1. "map_script"

Executed once per document collected Loop through all document and add unique identifierid and corresponding feedback value to dictionary

  1. combine_script

Executed once on each shard after document collection is complete

Return dictionary for all shards

  1. reduce_script

Executed once on the coordinating node after all shards have returned their results

Once again go through through all dictionary returned from each shard and create a single unique dictionary . Loop through dictionary to get sum or count of feedback

{
  "size": 0,
  "aggs": {
    "employee": {
      "terms": {
        "field": "empId",
        "size": 10000
      },
      "aggs": {
        "distinct_sum_feedback": {
          "scripted_metric": {
            "init_script": "state.transactions =new Hashtable();",
            "map_script": "if(state.transactions.get(doc.feedbackId)==null){state.transactions.put(doc.feedbackId, doc.feedback.value)}",
            "combine_script": "return state.transactions",
            "reduce_script": "def sum=0;def feedbacks=new Hashtable();for(a in states){for(entry in a.entrySet()){if(feedbacks.get(entry.getKey())==null){feedbacks.put(entry.getKey(),entry.getValue());}}}for(entry in feedbacks.entrySet()){sum+=entry.getValue();}    return sum;"
          }
        },
        "distinct_count_feedback": {
          "cardinality": {
            "field": "feedbackId"
          }
        },
        "distinct_avg_feedback": {
          "bucket_script": {
            "buckets_path": {
              "sum": "distinct_sum_feedback.value",
              "count": "distinct_count_feedback.value"
            },
            "script": "params.sum/params.count"
          }
        }
      }
    },
    "sum_feedback": {
      "sum_bucket": {
        "buckets_path": "employee>distinct_sum_feedback.value"
      }
    },
    "count_feedback": {
      "sum_bucket": {
        "buckets_path": "employee>distinct_count_feedback.value"
      }
    }
  }
}

Result: Distinct count for user 1: 2

Distinct sum for user 1: 8 (10 for duplicate)

"aggregations" : {
    "employee" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : 1,
          "doc_count" : 3,
          "distinct_count_feedback" : {
            "value" : 2
          },
          "distinct_sum_feedback" : {
            "value" : 8
          },
          "distinct_avg_feedback" : {
            "value" : 4.0
          }
        },
        {
          "key" : 2,
          "doc_count" : 1,
          "distinct_count_feedback" : {
            "value" : 1
          },
          "distinct_sum_feedback" : {
            "value" : 6
          },
          "distinct_avg_feedback" : {
            "value" : 6.0
          }
        }
      ]
    },
    "sum_feedback" : {
      "value" : 14.0
    },
    "count_feedback" : {
      "value" : 3.0
    }
  }

Upvotes: 1

Rose
Rose

Reputation: 53

This sounds very like you should use a couple of bucket aggregations. I can't see a way to get all this information in a single call, but I think there is certainly a way to aggregate your data to get the information you need.

About bucket aggregations.

In particular you could use a terms aggregation on the employee Id and feedback identifier to get each employee in its own bucket. From there, you can get a count of feedbacks per employee.

You could do similar bucket queries on only the feedback identifier to get a count of feedbacks across all your records.

And here is a reference to Avg_Bucket aggregation which you can use to calculate averages on your data buckets (groups).

Note that there is a dynamic cluster setting (search.max_buckets) which tops out at 10,000 buckets.

I think something like this might be what you're after, though I don't have an index to test it against. Perhaps this can get you on the right track looking at how to combine the various aggregations:

POST /_search
{
  "size": 0,
  "aggs": {
    "employees": {
      "terms": {
        "field": "employeeId"
      },
      "aggs": {
        "feedbacks": {
          "count": {
            "field": "feedbackId"
          }
        }
      }
    },
    "avg_feedback_score": {
      "avg_bucket": {
        "buckets_path": "employees>feedbacks" 
      }
    }
  }
}

Upvotes: 1

Related Questions