johnnyb
johnnyb

Reputation: 1825

Enrich Splunk search data based on temporal correlation from another search

I am trying to enrich my table1 data by adding field_to_enrich1 and 2 where fields 1-3 are the same and the _time is right before the _time of my event in table1.

To clarify based on comments, "right before" I mean the first log event that happens immediately prior to the _time field of my current event where fields 1-3 are all a match.

I have conducted a left join on field1,field2,field3 but am trying to figure out how to conduct the _time correlation between the two tables.

I have two tables within splunk like below.

Table1

_time,field1,field2,field3,field4
2022-11-10 13:19:55.308,oepwy0s4mjt,n6u,field4_random_123 
2022-11-10 13:19:56.308,6onbcity1n2,lwe,field4_random_456
2022-11-10 13:19:57.308,9rfkuntl7qx,2tc,field4_random_567
2022-11-10 13:19:58.308,fn44tlt6rtt,8tm,field4_random_234
2022-11-10 13:19:59.308,gj11nax4o68,lr3,field4_random_458
2022-11-10 13:20:00.308,mdgdj03sx9c,7pc,field4_random_124

Table2

_time,field1,field2,field3,field_to_enrich1,field_to_enrich2
2022-11-10 13:19:55.108,oepwy0s4mjt,n6u,83zuyt8vdyFF,ljr5furt0mFF
2022-11-10 13:19:55.208,oepwy0s4mjt,n6u,83zuyt8vdy75,ljr5furt0mfs
2022-11-10 13:19:56.108,6onbcity1n2,lwe,yeg1lhraoeGG,ngmly4majhGG
2022-11-10 13:19:56.208,6onbcity1n2,lwe,yeg1lhraoef0,ngmly4majhom
2022-11-10 13:19:57.108,9rfkuntl7qx,2tc,pfe6vssh0qej,me4yghhmj26t
2022-11-10 13:19:57.208,9rfkuntl7qx,2tc,pfe6vssh0qej,me4yghhmj26t
2022-11-10 13:19:58.108,fn44tlt6rtt,8tm,8l06613lartf,bx5h3v9l1udg
2022-11-10 13:19:58.208,fn44tlt6rtt,8tm,8l06613lartf,bx5h3v9l1udg
2022-11-10 13:19:59.208,oepwy0s4mjt,n6u,asdfasdfasdf,asdfasdfasdf
2022-11-10 13:20:00.208,oepwy0s4mjt,n6u,oimlkmjhgggh,asdfiiiidddd

Example output with the above tables is below.

Table3

_time,field1,field2,field3,field_to_enrich1,field_to_enrich2
2022-11-10 13:19:55.308,oepwy0s4mjt,n6u,field4_random_123,83zuyt8vdy75,ljr5furt0mfs
2022-11-10 13:19:56.308,6onbcity1n2,lwe,field4_random_456,yeg1lhraoef0,ngmly4majhom
2022-11-10 13:19:57.308,9rfkuntl7qx,2tc,field4_random_567,pfe6vssh0qej,me4yghhmj26t
2022-11-10 13:19:58.308,fn44tlt6rtt,8tm,field4_random_234,8l06613lartf,bx5h3v9l1udg
2022-11-10 13:19:59.308,gj11nax4o68,lr3,field4_random_458,FILLNULL,FILLNULL2
2022-11-10 13:20:00.308,mdgdj03sx9c,7pc,field4_random_124,FILLNULL,FILLNULL2

Any help would be greatly appreciated.

Upvotes: 0

Views: 165

Answers (1)

RichG
RichG

Reputation: 9926

I prefer to avoid join because it is expensive, but don't have an alternative. We can handle the "_time right before" requirement by using the dedup command to discard all but the latest event for a given set of fields.

<<your search for Table1>>
| fields _time,field1,field2,field3,field4
| join type=left field1,field2,field3 [
  <<your search for Table2>>
  | _time,field1,field2,field3,field_to_enrich1,field_to_enrich2
  ```Keep only the most recent event for each triplet```
  | dedup field1,field2,field3
]
| fillnull value="FILLNULL" field_to_enrich1
| fillnull value="FILLNULL2" field_to_enrich2
| table _time,field1,field2,field3,field_to_enrich1,field_to_enrich2

Answer #2 Here's some ugliness that should handle duplicative events - at least it works with the sample data. Note: I've removed references to 'field3' since it's not included in the data. Also, I changed _time to time in the samples so _time can be used in the query.

| makeresults 
| eval data="time,field1,field2,field4
2022-11-10 13:19:55.308,oepwy0s4mjt,n6u,field4_random_123 
2022-11-10 13:19:56.308,6onbcity1n2,lwe,field4_random_456
2022-11-10 13:19:57.308,9rfkuntl7qx,2tc,field4_random_567
2022-11-10 13:19:58.308,fn44tlt6rtt,8tm,field4_random_234
2022-11-10 13:19:59.308,gj11nax4o68,lr3,field4_random_458
2022-11-10 13:20:00.308,mdgdj03sx9c,7pc,field4_random_124" 
| eval _raw=data 
| multikv forceheader=1 
| eval _time=strptime(time,"%Y-%m-%d %H:%M:%S.%3N") 
| sort - _time
```Above defines test data.  Replace with your search for Table1```
| fields _time,field1,field2,field4 
```Define fields we'll need in the map command```
| eval etime=_time,efield1=field1,efield2=field2,efield4=field4 
```Repeat a search for each event in in Table1
   Change the value of maxsearches based on the expected number of rows in Table1```
| map maxsearches=1000 search="|makeresults | eval data=\"time,field1,field2,field_to_enrich1,field_to_enrich2
    2022-11-10 13:19:55.108,oepwy0s4mjt,n6u,83zuyt8vdyFF,ljr5furt0mFF
    2022-11-10 13:19:55.208,oepwy0s4mjt,n6u,83zuyt8vdy75,ljr5furt0mfs
    2022-11-10 13:19:56.108,6onbcity1n2,lwe,yeg1lhraoeGG,ngmly4majhGG
    2022-11-10 13:19:56.208,6onbcity1n2,lwe,yeg1lhraoef0,ngmly4majhom
    2022-11-10 13:19:57.108,9rfkuntl7qx,2tc,pfe6vssh0qej,me4yghhmj26t
    2022-11-10 13:19:57.208,9rfkuntl7qx,2tc,pfe6vssh0qej,me4yghhmj26t
    2022-11-10 13:19:58.108,fn44tlt6rtt,8tm,8l06613lartf,bx5h3v9l1udg
    2022-11-10 13:19:58.208,fn44tlt6rtt,8tm,8l06613lartf,bx5h3v9l1udg
    2022-11-10 13:19:59.208,oepwy0s4mjt,n6u,asdfasdfasdf,asdfasdfasdf
    2022-11-10 13:20:00.208,oepwy0s4mjt,n6u,oimlkmjhgggh,asdfiiiidddd\"
| eval _raw=data | multikv forceheader=1
| eval _time=strptime(time,\"%Y-%m-%d %H:%M:%S.%3N\")
```Everything from "|makeresults" to here defines test data.
   Replace with your search for Table2```
| sort - _time
```Look for the fields passed in from Table1```
| search field1=$efield1$ field2=$efield2$ _time<$etime$
```If get more than one, pick the first (most recent)```
| head 1
```Use the values of _time and field4 from Table1```
| eval _time=$etime$, field4=\"$efield4$\"
```If nothing was found in Table2 then assign values```
| appendpipe [stats count | eval _time=$etime$, field1=\"$efield1$\", field2=\"$efield2$\", field4=\"$efield4$\", field_to_enrich1=\"FILLNULL\", field_to_enrich2=\"FILLNULL2\" | where count=0 | fields - count]
| table _time,field1,field2,field4,field_to_enrich1,field_to_enrich2"

Upvotes: 0

Related Questions