Reputation: 1825
I am trying to enrich my table1 data by adding field_to_enrich1 and 2 where fields 1-3 are the same and the _time is right before the _time of my event in table1.
To clarify based on comments, "right before" I mean the first log event that happens immediately prior to the _time field of my current event where fields 1-3 are all a match.
I have conducted a left join on field1,field2,field3 but am trying to figure out how to conduct the _time correlation between the two tables.
I have two tables within splunk like below.
Table1
_time,field1,field2,field3,field4
2022-11-10 13:19:55.308,oepwy0s4mjt,n6u,field4_random_123
2022-11-10 13:19:56.308,6onbcity1n2,lwe,field4_random_456
2022-11-10 13:19:57.308,9rfkuntl7qx,2tc,field4_random_567
2022-11-10 13:19:58.308,fn44tlt6rtt,8tm,field4_random_234
2022-11-10 13:19:59.308,gj11nax4o68,lr3,field4_random_458
2022-11-10 13:20:00.308,mdgdj03sx9c,7pc,field4_random_124
Table2
_time,field1,field2,field3,field_to_enrich1,field_to_enrich2
2022-11-10 13:19:55.108,oepwy0s4mjt,n6u,83zuyt8vdyFF,ljr5furt0mFF
2022-11-10 13:19:55.208,oepwy0s4mjt,n6u,83zuyt8vdy75,ljr5furt0mfs
2022-11-10 13:19:56.108,6onbcity1n2,lwe,yeg1lhraoeGG,ngmly4majhGG
2022-11-10 13:19:56.208,6onbcity1n2,lwe,yeg1lhraoef0,ngmly4majhom
2022-11-10 13:19:57.108,9rfkuntl7qx,2tc,pfe6vssh0qej,me4yghhmj26t
2022-11-10 13:19:57.208,9rfkuntl7qx,2tc,pfe6vssh0qej,me4yghhmj26t
2022-11-10 13:19:58.108,fn44tlt6rtt,8tm,8l06613lartf,bx5h3v9l1udg
2022-11-10 13:19:58.208,fn44tlt6rtt,8tm,8l06613lartf,bx5h3v9l1udg
2022-11-10 13:19:59.208,oepwy0s4mjt,n6u,asdfasdfasdf,asdfasdfasdf
2022-11-10 13:20:00.208,oepwy0s4mjt,n6u,oimlkmjhgggh,asdfiiiidddd
Example output with the above tables is below.
Table3
_time,field1,field2,field3,field_to_enrich1,field_to_enrich2
2022-11-10 13:19:55.308,oepwy0s4mjt,n6u,field4_random_123,83zuyt8vdy75,ljr5furt0mfs
2022-11-10 13:19:56.308,6onbcity1n2,lwe,field4_random_456,yeg1lhraoef0,ngmly4majhom
2022-11-10 13:19:57.308,9rfkuntl7qx,2tc,field4_random_567,pfe6vssh0qej,me4yghhmj26t
2022-11-10 13:19:58.308,fn44tlt6rtt,8tm,field4_random_234,8l06613lartf,bx5h3v9l1udg
2022-11-10 13:19:59.308,gj11nax4o68,lr3,field4_random_458,FILLNULL,FILLNULL2
2022-11-10 13:20:00.308,mdgdj03sx9c,7pc,field4_random_124,FILLNULL,FILLNULL2
Any help would be greatly appreciated.
Upvotes: 0
Views: 165
Reputation: 9926
I prefer to avoid join
because it is expensive, but don't have an alternative. We can handle the "_time right before" requirement by using the dedup
command to discard all but the latest event for a given set of fields.
<<your search for Table1>>
| fields _time,field1,field2,field3,field4
| join type=left field1,field2,field3 [
<<your search for Table2>>
| _time,field1,field2,field3,field_to_enrich1,field_to_enrich2
```Keep only the most recent event for each triplet```
| dedup field1,field2,field3
]
| fillnull value="FILLNULL" field_to_enrich1
| fillnull value="FILLNULL2" field_to_enrich2
| table _time,field1,field2,field3,field_to_enrich1,field_to_enrich2
Answer #2 Here's some ugliness that should handle duplicative events - at least it works with the sample data. Note: I've removed references to 'field3' since it's not included in the data. Also, I changed _time to time in the samples so _time can be used in the query.
| makeresults
| eval data="time,field1,field2,field4
2022-11-10 13:19:55.308,oepwy0s4mjt,n6u,field4_random_123
2022-11-10 13:19:56.308,6onbcity1n2,lwe,field4_random_456
2022-11-10 13:19:57.308,9rfkuntl7qx,2tc,field4_random_567
2022-11-10 13:19:58.308,fn44tlt6rtt,8tm,field4_random_234
2022-11-10 13:19:59.308,gj11nax4o68,lr3,field4_random_458
2022-11-10 13:20:00.308,mdgdj03sx9c,7pc,field4_random_124"
| eval _raw=data
| multikv forceheader=1
| eval _time=strptime(time,"%Y-%m-%d %H:%M:%S.%3N")
| sort - _time
```Above defines test data. Replace with your search for Table1```
| fields _time,field1,field2,field4
```Define fields we'll need in the map command```
| eval etime=_time,efield1=field1,efield2=field2,efield4=field4
```Repeat a search for each event in in Table1
Change the value of maxsearches based on the expected number of rows in Table1```
| map maxsearches=1000 search="|makeresults | eval data=\"time,field1,field2,field_to_enrich1,field_to_enrich2
2022-11-10 13:19:55.108,oepwy0s4mjt,n6u,83zuyt8vdyFF,ljr5furt0mFF
2022-11-10 13:19:55.208,oepwy0s4mjt,n6u,83zuyt8vdy75,ljr5furt0mfs
2022-11-10 13:19:56.108,6onbcity1n2,lwe,yeg1lhraoeGG,ngmly4majhGG
2022-11-10 13:19:56.208,6onbcity1n2,lwe,yeg1lhraoef0,ngmly4majhom
2022-11-10 13:19:57.108,9rfkuntl7qx,2tc,pfe6vssh0qej,me4yghhmj26t
2022-11-10 13:19:57.208,9rfkuntl7qx,2tc,pfe6vssh0qej,me4yghhmj26t
2022-11-10 13:19:58.108,fn44tlt6rtt,8tm,8l06613lartf,bx5h3v9l1udg
2022-11-10 13:19:58.208,fn44tlt6rtt,8tm,8l06613lartf,bx5h3v9l1udg
2022-11-10 13:19:59.208,oepwy0s4mjt,n6u,asdfasdfasdf,asdfasdfasdf
2022-11-10 13:20:00.208,oepwy0s4mjt,n6u,oimlkmjhgggh,asdfiiiidddd\"
| eval _raw=data | multikv forceheader=1
| eval _time=strptime(time,\"%Y-%m-%d %H:%M:%S.%3N\")
```Everything from "|makeresults" to here defines test data.
Replace with your search for Table2```
| sort - _time
```Look for the fields passed in from Table1```
| search field1=$efield1$ field2=$efield2$ _time<$etime$
```If get more than one, pick the first (most recent)```
| head 1
```Use the values of _time and field4 from Table1```
| eval _time=$etime$, field4=\"$efield4$\"
```If nothing was found in Table2 then assign values```
| appendpipe [stats count | eval _time=$etime$, field1=\"$efield1$\", field2=\"$efield2$\", field4=\"$efield4$\", field_to_enrich1=\"FILLNULL\", field_to_enrich2=\"FILLNULL2\" | where count=0 | fields - count]
| table _time,field1,field2,field4,field_to_enrich1,field_to_enrich2"
Upvotes: 0