redgiant
redgiant

Reputation: 504

How do I filter a 'reduce inputs' over a large stream of objects?

I use this to accumulate a map of unique keys whose value is the aggregate count and duration totals. Currently it runs on every input via 'reduce inputs'.

reduce inputs as $r
({};
("Pipeline:" + $r.m."topic.type") as $topic
| ("Channel:" + $r.channel) as $channel
| ("Campaign:" + $r.campaign) as $campaign
| ("Cellcode:" + $r.cellcode) as $cellcode
| ("Tracking:" + $r.tracking) as $tracking
| ("Template:" + $r.m."template.id") as $template
| ("Event:" + $r.name) as $event
| ("Reason:" + $r.reason) as $reason
| ($r.duration|tonumber) as $duration
| (($topic + ":" + $channel + ":" + $campaign + ":" + $cellcode + ":" + $tracking + ":" + $template + ":" + $event + ":" + $reason) as $key
  | .[$key][0] += 1 | .[$key][1] += $duration)

I cannot figure out where to put a select() filter so that I do my reduce across only those entries that pass a 'select($r.type == "AUDIT_CHANNEL")' check, in order to skip the 2 "type":"AUDIT_SYSTEM" events in this test data:

{"type":"AUDIT_CHANNEL","name":"DROPPED","reason":"INVALID_MAIL_META_DATA","start":"1472083067058","duration":"91","end":"1472083067149","dc":"dev","pool":"raptor-app","host.name":"L-SEA-10002721","host.ip":"10.236.67.80","rlogid":"tfsqiu.dvw9%3FJ*P%40G*25671246-156befd00b2-0x293","channel":"EMAIL","m":{"audited":"1472083067058","created":"1472083066974","enabled":"true","entity.common.version":"1","template.id":"2840df6d-d9e8-4f27-e8b5-918c122d4561","template.version":"17","topic.curname":"eddude-default-topic","topic.curtype":"DEFAULT","topic.dc":"LVS","topic.name":"eddude-default-topic","topic.part":"5","topic.type":"DEFAULT"},"id":"0AEC4350-1C6E2FC9B80-0156BEF9ED92-0000000000000003","campaign":"999","contract":"a5872a5c-8912-dd63-583f-61fa8db3efde","user":1276847275,"cellcode":"","age":"175"}

{"type":"AUDIT_SYSTEM","name":"ROTATED","start":"1472083081033","duration":"0","end":"1472083081033","dc":"dev","pool":"raptor-app","host.name":"L-SEA-10002721","host.ip":"10.236.67.80","rlogid":"tfsqiu.dvw9%3FJ*P%40G*25671246-156befd3749-0xce"}

{"type":"AUDIT_SYSTEM","name":"ROTATED","start":"1472083141034","duration":"0","end":"1472083141034","dc":"dev","pool":"raptor-app","host.name":"L-SEA-10002721","host.ip":"10.236.67.80","rlogid":"tfsqiu.dvw9%3FJ*P%40G*25671246-156befe21aa-0xce"}

{"type":"AUDIT_CHANNEL","name":"RECEIVED","start":"1472083158860","duration":"109","end":"1472083158969","dc":"dev","pool":"raptor-app","host.name":"L-SEA-10002721","host.ip":"10.236.67.80","rlogid":"tfsqiu.dvw9%3FJ*P%40G*25671246-156befe674c-0x10f","channel":"EMAIL","m":{"audited":"1472083158860","created":"1472083158860","enabled":"true","entity.common.version":"1","template.id":"2840df6d-d9e8-4f27-e8b5-918c122d4561","template.version":"17","topic.curname":"eddude-default-topic","topic.curtype":"DEFAULT","topic.dc":"LVS","topic.name":"eddude-default-topic","topic.part":"5","topic.type":"DEFAULT"},"id":"0AEC4350-1C6E2FC9B80-0156BEF9ED92-0000000000000004","campaign":"999","contract":"a5872a5c-8912-dd63-583f-61fa8db3efde","user":1276847275,"cellcode":"","age":"109"}

I tried putting it in front of the reduce, inside the reduce, etc but I don't get the desired output which is:

{
  "Pipeline:DEFAULT:Channel:EMAIL:Campaign:999:Cellcode::Tracking::Template:2840df6d-d9e8-4f27-e8b5-918c122d4561:Event:DROPPED:Reason:INVALID_MAIL_META_DATA": [
    1,
    91
  ],
  "Pipeline:DEFAULT:Channel:EMAIL:Campaign:999:Cellcode::Tracking::Template:2840df6d-d9e8-4f27-e8b5-918c122d4561:Event:RECEIVED:Reason:": [
    1,
    109
  ]
}

Do I have to perform filtering totally outside of the reduce run, or am I just not aware of how to do this with a single filter-and-reduce?

Btw, assume this input is a giant stream of millions of records, with a few hundred unique "keys" that get calculated for accumulating into.

Upvotes: 1

Views: 68

Answers (1)

Jeff Mercado
Jeff Mercado

Reputation: 134501

inputs will produce a result for every input that it is fed. You want to filter those inputs by the type so you could put your filter there:

reduce (inputs | select(.type == "AUDIT_CHANNEL")) as $r ...

I would write your filter like so:

reduce (inputs | select(.type == "AUDIT_CHANNEL")) as $r ({};
    ([
        "Pipeline", $r.m."topic.type",
        "Channel",  $r.channel,
        "Campaign", $r.campaign,
        "Cellcode", $r.cellcode,
        "Tracking", $r.tracking,
        "Template", $r.m."template.id",
        "Event",    $r.name,
        "Reason",   $r.reason
    ] | join(":")) as $key
    | .[$key] |= [ .[0]+1, .[1]+($r.duration|tonumber) ]
)

Upvotes: 1

Related Questions