JaneD
JaneD

Reputation: 199

Logstash multiline not parsing Oracle log properly

I have been testing with Logstash pipeline to process Oracle multiline audit log that has the following format:

Audit file /u01/app/oracle/admin/DEVINST/adump/DEVINST_ora_15460_20201001230100743853143795.aud
Oracle Database 12c Standard Edition Release 12.2.0.1.0 - 64bit DEV
Build label:    RDBMS_12.2.0.1.0_LINUX.X64_170125
ORACLE_HOME:    /u01/app/oracle/product/12.2.0/dbhome_1
System name:    Linux
Node name:      testdevserver
Release:        3.10.0-862.14.4.el7.x86_64
Version:        #1 SMP Fri Sep 21 09:07:21 UTC 2018
Machine:        x86_64
Instance name: DEVINST
Redo thread mounted by this instance: 1
Oracle process number: 57
Unix process pid: 15460, image: oracle@testdevserver (TNS V1-V3)

Thu Oct  1 23:01:00 2020 +00:00
LENGTH : '275'
ACTION :[7] 'CONNECT'
DATABASE USER:[1] '/'
PRIVILEGE :[6] 'SYSDBA'
CLIENT USER:[9] 'test_user'
CLIENT TERMINAL:[5] 'pts/0'
STATUS:[1] '0'
DBID:[10] '1762369616'
SESSIONID:[10] '4294967295'
USERHOST:[21] 'testdevserver'
CLIENT ADDRESS:[0] ''
ACTION NUMBER:[3] '100'

Thu Oct  1 23:01:00 2020 +00:00
LENGTH : '296'
ACTION :[29] 'SELECT STATUS FROM V$INSTANCE'
DATABASE USER:[1] '/'
PRIVILEGE :[6] 'SYSDBA'
CLIENT USER:[9] 'test_user'
CLIENT TERMINAL:[5] 'pts/0'
STATUS:[1] '0'
DBID:[10] '1762369616'
SESSIONID:[10] '4294967295'
USERHOST:[21] 'testdevserver'
CLIENT ADDRESS:[0] ''
ACTION NUMBER:[1] '3'

My /etc/logstash/conf.d/25-filter.conf :

filter {
    grok {
      match => { "message" => "(?<day>^[A-Za-z]{3}) (?<month>[A-Za-z]{3}) (?<monthday>[0-9]{2}) (?<hour>[0-9]{2}):(?<min>[0-9]{2}):(?<sec>[0-9]{2}) (?<year>[0-9]{4}) %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
      add_tag => [ "oracle_audit" ]
    }
    grok {
    match => { "ACTION :\[[0-9]*\] '(?<ora_audit_action>.*)'.*DATABASE USER:\[[0-9]*\] '(?<ora_audit_dbuser>.*)'.*PRIVILEGE :\[[0-9]*\] '(?<ora_audit_priv>.*)'.*CLIENT USER:\[[0-9]*\] '(?<ora_audit_osuser>.*)'.*CLIENT TERMINAL:\[[0-9]*\] '(?<ora_audit_term>.*)'.*STATUS:\[[0-9]*\] '(?<ora_audit_status>.*)'.*DBID:\[[0-9]*\] '(?<ora_audit_dbid>.*)'.*SESSIONID:\[[0-9]*\] '(?<ora_audit_sessionid>.*)'.*USERHOST:\[[0-9]*\] '(?<ora_audit_dbhost>.*)'.*CLIENT ADDRESS:\[[0-9]*\] '(?<ora_audit_clientaddr>.*)'.*ACTION NUMBER:\[[0-9]*\] '(?<ora_audit_actionnum>.*)'" }
    }
    grok {
      match => { "source" => [ ".*/[a-zA-Z0-9_#$]*_[a-z0-9]*_(?<ora_audit_derived_pid>[0-9]*)_[0-9]*\.aud" ] }
    }
    mutate {
      add_field => { "timestamp" => "%{year}-%{day}-%{monthday} %{hour}:%{min}:%{sec}" }
    }
    date {
      locale => "en"
      match => [ "timestamp", "YYYY-MMM-dd HH:mm:ss" ]
    }

    mutate {
      remove_field => [ "timestamp", "year", "day", "monthday", "hour", "min", "sec" ]
    }
}

My /etc/logstash/conf.d/000-file-in.conf file :

input {
    file {
        path => [ "/tmp/testora" ]
        start_position => "beginning"
        codec => multiline
        {
                pattern => "^[A-Za-z]{3} [A-Za-z]{3} [0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} [0-9]{4}"
                negate => "true"
                what => "previous"
         }
    }
}

I then tested this by running :

/usr/share/logstash/bin/logstash -r -f /etc/logstash/conf.d/ :

....

[INFO ] 2020-10-05 11:16:30.656 [Converge PipelineAction::Reload<main>] reload - Reloading pipeline {"pipeline.id"=>:main}
[INFO ] 2020-10-05 11:16:30.662 [Converge PipelineAction::Reload<main>] observingtail - QUIT - closing all files and shutting down.
{
         "ora_audit_dbid" => "1762369616",
    "ora_audit_actionnum" => "3",
    "ora_audit_sessionid" => "4294967295",
               "@version" => "1",
                   "tags" => [
        [0] "multiline",
        [1] "_grokparsefailure",
        [2] "_dateparsefailure"
    ],
                   "path" => "/tmp/testora",
       "ora_audit_action" => [
        [0] "275'\nACTION :[7] 'CONNECT'\nDATABASE USER:[1] '/'\nPRIVILEGE :[6] 'SYSDBA'\nCLIENT USER:[9] 'test_user'\nCLIENT TERMINAL:[5] 'pts/0'\nSTATUS:[1] '0'\nDBID:[10] '1762369616'\nSESSIONID:[10] '4294967295'\nUSERHOST:[21] 'testdevserver'\nCLIENT ADDRESS:[0] ''\nACTION NUMBER:[3] '100'\nThu Oct  1 23:01:00 2020 +00:00\nLENGTH : '296",
        [1] "SELECT STATUS FROM V$INSTANCE"
    ],
             "@timestamp" => 2020-10-05T01:16:30.889Z,
         "ora_audit_priv" => "SYSDBA",
                "message" => "Audit file /u01/app/oracle/admin/DEVINST/adump/DEVINST_ora_15460_20201001230100743853143795.aud\nOracle Database 12c Standard Edition Release 12.2.0.1.0 - 64bit DEV\nBuild label:    RDBMS_12.2.0.1.0_LINUX.X64_170125\nORACLE_HOME:    /u01/app/oracle/product/12.2.0/dbhome_1\nSystem name:    Linux\nNode name:      testdevserver\nRelease:        3.10.0-862.14.4.el7.x86_64\nVersion:        #1 SMP Fri Sep 21 09:07:21 UTC 2018\nMachine:        x86_64\nInstance name: DEVINST\nRedo thread mounted by this instance: 1\nOracle process number: 57\nUnix process pid: 15460, image: oracle@testdevserver (TNS V1-V3)\nThu Oct  1 23:01:00 2020 +00:00\nLENGTH : '275'\nACTION :[7] 'CONNECT'\nDATABASE USER:[1] '/'\nPRIVILEGE :[6] 'SYSDBA'\nCLIENT USER:[9] 'test_user'\nCLIENT TERMINAL:[5] 'pts/0'\nSTATUS:[1] '0'\nDBID:[10] '1762369616'\nSESSIONID:[10] '4294967295'\nUSERHOST:[21] 'testdevserver'\nCLIENT ADDRESS:[0] ''\nACTION NUMBER:[3] '100'\nThu Oct  1 23:01:00 2020 +00:00\nLENGTH : '296'\nACTION :[29] 'SELECT STATUS FROM V$INSTANCE'\nDATABASE USER:[1] '/'\nPRIVILEGE :[6] 'SYSDBA'\nCLIENT USER:[9] 'test_user'\nCLIENT TERMINAL:[5] 'pts/0'\nSTATUS:[1] '0'\nDBID:[10] '1762369616'\nSESSIONID:[10] '4294967295'\nUSERHOST:[21] 'testdevserver'\nCLIENT ADDRESS:[0] ''\nACTION NUMBER:[1] '3'",
       "ora_audit_dbhost" => "testdevserver",
                   "host" => "myhost",
       "ora_audit_dbuser" => "/",
       "ora_audit_osuser" => "test_user",
         "ora_audit_term" => "pts/0",
       "ora_audit_status" => "0"
}

Unfortunately this isnt what I was expecting. Somehow it is not chunking the messages and parsing the messages correctly. I was expecting something like :

         "ora_audit_action" => "CONNECT",
       "ora_audit_dbuser" => "/",
         "ora_audit_dbid" => "1762369616",
       "ora_audit_status" => "0",
       "ora_audit_osuser" => "test_user",
         "ora_audit_priv" => "SYSDBA",
         "ora_audit_term" => "pts/0",
    "ora_audit_sessionid" => "4294967295",
       "ora_audit_dbhost" => "testdevserver",
       "ora_audit_clientaddr" => "",
    "ora_audit_actionnum" => "3",
                   "host" => "myhost",
               "@version" => "1",
             "@timestamp" => 2020-10-05T01:16:30.889Z,
                "message" => "Thu Oct  1 23:01:00 2020 +00:00\nLENGTH : '275'\nACTION :[7] 'CONNECT'\nDATABASE USER:[1] '/'\nPRIVILEGE :[6] 'SYSDBA'\nCLIENT USER:[9] 'test_user'\nCLIENT TERMINAL:[5] 'pts/0'\nSTATUS:[1] '0'\nDBID:[10] '1762369616'\nSESSIONID:[10] '4294967295'\nUSERHOST:[21] 'testdevserver'\nCLIENT ADDRESS:[0] ''\nACTION NUMBER:[3] '100'\nThu Oct  1 23:01:00 2020 +00:00\nLENGTH : '296'\nACTION :[29] 'SELECT STATUS FROM V$INSTANCE'\nDATABASE USER:[1] '/'\nPRIVILEGE :[6] 'SYSDBA'\nCLIENT USER:[9] 'test_user'\nCLIENT TERMINAL:[5] 'pts/0'\nSTATUS:[1] '0'\nDBID:[10] '1762369616'\nSESSIONID:[10] '4294967295'\nUSERHOST:[21] 'testdevserver'\nCLIENT ADDRESS:[0] ''\nACTION NUMBER:[1] '3'"

And I am seeing some of these info, however I am also expecting it to spit out 2 "chunks" of messages (matching the pattern ^[A-Za-z]{3} [A-Za-z]{3} [0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} [0-9]{4} as the start of the line, eg Thu Oct 1 23:01:00 2020 +00:00) - instead it looks like its seeing only one? I think my pattern matching might be the issue here, appreciate if anyone can offer any hints.

Also, I'm not sure what generated these errors [1] "_grokparsefailure", [2] "_dateparsefailure" - obviously its not parsing things quite right but I just dont know how to go about it.

Help :(

Thanks J

Upvotes: 1

Views: 532

Answers (3)

karan shah
karan shah

Reputation: 370

I think below pattern should work on your multi-line codec and give the required grouping.

pattern => "^([A-Za-z]{3})(\s*)([A-Za-z]{3})(\s*)([0-9]{1,2})(\s*)([0-9]{2}:[0-9]{2}:[0-9]{2} [0-9]{4})"

Also, below grok filter should give you the desired breakdown of the multi-line. You might want to rename the keys as per liking.


input {
    file {
        path => [ "/tmp/testora" ]
        start_position => "beginning"
        codec => multiline
        {
            pattern => "^([A-Za-z]{3})(\s*)([A-Za-z]{3})(\s*)([0-9]{1,2})(\s*)([0-9]{2}:[0-9]{2}:[0-9]{2} [0-9]{4})"
            negate => "true"
            what => "previous"
         }
    }
}
filter {
    grok {
      match => { "message" => "(?<day>[A-Za-z]{3})%{SPACE}(?<month>[A-Za-z]{3})%{SPACE}(?<monthday>[0-9]{1,2})%{SPACE}(?<hour>[0-9]{1,2}):(?<min>[0-9]{1,2}):(?<sec>[0-9]{2})%{SPACE}(?<year>[0-9]{4})%{SPACE}%{GREEDYDATA:message}" }
      overwrite => [ "message" ]
      add_tag => [ "oracle_audit" ]
    }
    kv {
        field_split => "\n"
        value_split_pattern => "\s*:\s*\[[0-9]*\]\s*"
        source => "message"
    }
    mutate {
      add_field => { "timestamp" => "%{year}-%{month}-%{monthday} %{hour}:%{min}:%{sec}" }
    }
    date {
      locale => "en"
      match => [ "timestamp", "YYYY-MMM-dd HH:mm:ss" ]
    }
    mutate {
      remove_field => [ "timestamp", "year", "day", "monthday", "month", "hour", "min", "sec" ]
    }
}
output {
    stdout {}
}

Sample output of the pipeline is as below

{
          "PRIVILEGE" => "SYSDBA",
               "DBID" => "1762369616",
      "ACTION NUMBER" => "100",
           "USERHOST" => "testdevserver",
           "@version" => "1",
               "path" => "/usr/share/logstash/stack/data/file.txt",
          "SESSIONID" => "4294967295",
        "CLIENT USER" => "test_user",
    "CLIENT TERMINAL" => "pts/0",
             "STATUS" => "0",
               "host" => "e8f97b3e53ff",
               "tags" => [
        [0] "multiline",
        [1] "oracle_audit"
    ],
             "ACTION" => "CONNECT",
      "DATABASE USER" => "/",
         "@timestamp" => 2020-10-01T23:01:00.000Z,
            "message" => "+00:00\nLENGTH : '275'\nACTION :[7] 'CONNECT'\nDATABASE USER:[1] '/'\nPRIVILEGE :[6] 'SYSDBA'\nCLIENT USER:[9] 'test_user'\nCLIENT TERMINAL:[5] 'pts/0'\nSTATUS:[1] '0'\nDBID:[10] '1762369616'\nSESSIONID:[10] '4294967295'\nUSERHOST:[21] 'testdevserver'\nCLIENT ADDRESS:[0] ''\nACTION NUMBER:[3] '100'"
}

Upvotes: 1

Sourav
Sourav

Reputation: 3392

Find below the Grok pattern that matches Thu Oct 1 23:01:00 2020 +00:00

(?<timestamp>%{DAY} %{MONTH}  %{MONTHDAY} %{TIME})

Screenshot of the output:

enter image description here

Upvotes: 1

chitresh
chitresh

Reputation: 346

based on your requirement you have to modify your grok pattern in the second grok filter for both LENGH and ACTION parameter you are creating the same field ora_audit_action so it will apped the data and form an array better to create separate field for both parameters

grok {
match => { "msg1" => "LENGTH : '(?<ora_audit_length>.*)'.ACTION :\[[0-9]*\] '(?<ora_audit_action>.*)'.*DATABASE USER:\[[0-9]*\] '(?<ora_audit_dbuser>.*)'.*PRIVILEGE :\[[0-9]*\] '(?<ora_audit_priv>.*)'.*CLIENT USER:\[[0-9]*\] '(?<ora_audit_osuser>.*)'.*CLIENT TERMINAL:\[[0-9]*\] '(?<ora_audit_term>.*)'.*STATUS:\[[0-9]*\] '(?<ora_audit_status>.*)'.*DBID:\[[0-9]*\] '(?<ora_audit_dbid>.*)'.*SESSIONID:\[[0-9]*\] '(?<ora_audit_sessionid>.*)'.*USERHOST:\[[0-9]*\] '(?<ora_audit_dbhost>.*)'.*CLIENT ADDRESS:\[[0-9]*\] '(?<ora_audit_clientaddr>.*)'.*ACTION NUMBER:\[[0-9]*\] '(?<ora_audit_actionnum>.*)'" }
}

in place of removing the fields you can whitelist the necessary fields by using prune filter.

prune {
     whitelist_names => ["ora_audit_action", "ora_audit_dbuser", "ora_audit_dbid", "ora_audit_status", "ora_audit_osuser", "ora_audit_priv", "ora_audit_term", "ora_audit_sessionid", "ora_audit_dbhost", "ora_audit_clientaddr", "ora_audit_actionnum", "host", "@timestamp", "@version", "message"]
}

in your code, you are segregating the timestamp and reforming it but in filter part you are removing that and using default @timestamp field, if you do not have requirement then you can remove unnecessary code.

Upvotes: 0

Related Questions