Reputation: 8224
TL;DR: I want to do the equivalent of Haskell's zipWith with buckets in elasticsearch.
I have an index with time and value "tuples", and each entry also has a head_id
, pointing to meta information about a series of such tuples. It's the timeseries ID. Visualized it might look like this:
head_id | timestamp | value
---------+---------------+-------
1 | 1104537600000 | 10
1 | 1104538500000 | 20
1 | 1104539400000 | 30
2 | 1104537600000 | 1000
2 | 1104538500000 | 2000
2 | 1104539400000 | 3000
Let's represent each individual timeseries as a list like this, for clarity:
1: [ 10, 20, 30]
2: [1000, 2000, 3000]
What I want to achieve is "zip" those series together in an elasticsearch aggregation: Let's say I want to sum
them:
result: [1010, 2020, 3030]
I currently need to fetch all the data and do the desired operation in application code. Now, to save memory and network bandwidth, I want to perform some operations like this directly within elasticsearch.
In this case, because the values I want to add up share the same timestamp, I was able to achieve this using a terms
bucket aggregation with a sum
sub-aggregation
GET /timeseries/_search
{
"aggs": {
"result": {
"terms": {"field": "timestamp"},
"aggs": {
"values_sum": {
"sum": {"field": "value"}
}
}
}
}
}
returns (simplified):
{
"aggregations": {
"result": {
"buckets": [
{
"key": 1104537600000,
"doc_count": 2,
"values_sum": {"value": 1010}
},
{
"key": 1104538500000,
"doc_count": 2,
"values_sum": {"value": 2020}
},
{
"key": 1104539400000,
"doc_count": 2,
"values_sum": {"value": 3030}
}
]
}
}
}
However, in my case it isn't guaranteed that the timeseries' timestamps will align like this, which means I need a more general way of aggregating 2 (or more general N) timeseries, assuming they will have the same amount of values each.
A potential workaround I thought of was to shift the beginning of each timeseries to 0, and then use the above technique. However, I don't know how I could achieve that.
Another potential workaround I thought of was first aggregating over head_id
to get a bucket for each timeseries, and then use something like the serial differencing aggregation with lag=1
. I can't use that aggregation though, because I want to do other operations than just subtraction, and it requires the buckets to be generated through a histogram
aggregation, which isn't the case for me.
Upvotes: 0
Views: 47
Reputation: 8224
A potential workaround I thought of was to shift the beginning of each timeseries to 0, and then use the above technique. However, I don't know how I could achieve that.
This can be achieved using a script for the terms
bucket key. It looks like this:
GET /timeseries/_search
{
"aggs": {
"result": {
"terms": {
"field": "timestamp",
"script": {
"inline": "_value - params.anchors[doc.head_id.value]",
"params": {
"anchors": {
"1": 1104537600000,
"2": 1104624000000,
...
}
}
}
},
"aggs": {
"values_sum": {
"sum": {"field": "value"}
}
}
}
}
}
Where anchors
is a map associating head_id
to the respective time instant that each series should start at.
Upvotes: 0