Felk
Felk

Reputation: 8224

zip-like bucket aggregation

TL;DR: I want to do the equivalent of Haskell's zipWith with buckets in elasticsearch.

I have an index with time and value "tuples", and each entry also has a head_id, pointing to meta information about a series of such tuples. It's the timeseries ID. Visualized it might look like this:

 head_id |     timestamp | value
---------+---------------+-------
       1 | 1104537600000 |    10
       1 | 1104538500000 |    20
       1 | 1104539400000 |    30
       2 | 1104537600000 |  1000
       2 | 1104538500000 |  2000
       2 | 1104539400000 |  3000

Let's represent each individual timeseries as a list like this, for clarity:

1: [  10,   20,   30]
2: [1000, 2000, 3000]

What I want to achieve is "zip" those series together in an elasticsearch aggregation: Let's say I want to sum them:

result: [1010, 2020, 3030]

I currently need to fetch all the data and do the desired operation in application code. Now, to save memory and network bandwidth, I want to perform some operations like this directly within elasticsearch.

In this case, because the values I want to add up share the same timestamp, I was able to achieve this using a terms bucket aggregation with a sum sub-aggregation

GET /timeseries/_search
{
  "aggs": {
    "result": {
      "terms": {"field": "timestamp"},
      "aggs": {
        "values_sum": {
          "sum": {"field": "value"}
        }
      }
    }
  } 
}

returns (simplified):

{
  "aggregations": {
    "result": {
      "buckets": [
        {
          "key": 1104537600000,
          "doc_count": 2,
          "values_sum": {"value": 1010}
        },
        {
          "key": 1104538500000,
          "doc_count": 2,
          "values_sum": {"value": 2020}
        },
        {
          "key": 1104539400000,
          "doc_count": 2,
          "values_sum": {"value": 3030}
        }
      ]
    }
  }
}

However, in my case it isn't guaranteed that the timeseries' timestamps will align like this, which means I need a more general way of aggregating 2 (or more general N) timeseries, assuming they will have the same amount of values each.

A potential workaround I thought of was to shift the beginning of each timeseries to 0, and then use the above technique. However, I don't know how I could achieve that.

Another potential workaround I thought of was first aggregating over head_id to get a bucket for each timeseries, and then use something like the serial differencing aggregation with lag=1. I can't use that aggregation though, because I want to do other operations than just subtraction, and it requires the buckets to be generated through a histogram aggregation, which isn't the case for me.

Upvotes: 0

Views: 47

Answers (1)

Felk
Felk

Reputation: 8224

A potential workaround I thought of was to shift the beginning of each timeseries to 0, and then use the above technique. However, I don't know how I could achieve that.

This can be achieved using a script for the terms bucket key. It looks like this:

GET /timeseries/_search
{
  "aggs": {
    "result": {
      "terms": {
        "field": "timestamp",
        "script": {
          "inline": "_value - params.anchors[doc.head_id.value]",
          "params": {
            "anchors": {
              "1": 1104537600000,
              "2": 1104624000000,
              ...
            }
          }
        }
      },
      "aggs": {
        "values_sum": {
          "sum": {"field": "value"}
        }
      }
    }
  }
}

Where anchors is a map associating head_id to the respective time instant that each series should start at.

Upvotes: 0

Related Questions