40339109
40339109

Reputation: 7

Logstash content based filtering, into multiple indexs

I am currently pulling JSON log files from an S3 bucket which contain different types of logs defined as RawLog, along with another value which is MessageSourceType (there are more metadata fields which I don't care about). Each line on the file is a separate log in case that makes a difference.

I currently have these all going into 1 index as seen in my config below, however, I ideally want to split these out into separate indexes. For example, if the MessageSourceType = Syslog - Linux Host then I need logstash to extract the RawLog as syslog and place it into an index called logs-syslog, whereas if the MessageSourceType = MS Windows Event Logging XML I want it to extract the RawLog as XML and place it in an index called logs-MS_Event_logs.

filter {
   mutate {
     replace => [ "message", "%{message}" ]
   }
   json {
        source => "message"
        remove_field => "message"
   }
}

output {
   elasticsearch {
     hosts => ["http://xx.xx.xx.xx:xxxx","http://xx.xx.xx.xx:xxxx"]
     index => "logs-received"
 }

Also for a bit of context here is an example of one of the logs:

{"MsgClassTypeId":"3000","Direction":"0","ImpactedZoneEnum":"0","message":"<30>Feb 13 23:45:24 xx.xx.xx.xx Account=\"\" Action=\"\" Aggregate=\"False\" Amount=\"\" Archive=\"True\" BytesIn=\"\" BytesOut=\"\" CollectionSequence=\"825328\" Command=\"\" CommonEventId=\"3\" CommonEventName=\"General Operations\" CVE=\"\" DateInserted=\"2/13/2021 11:45:24 PM\" DInterface=\"\" DIP=\"\" Direction=\"0\" DirectionName=\"Unknown\" DMAC=\"\" DName=\"\" DNameParsed=\"\" DNameResolved=\"\" DNATIP=\"\" DNATPort=\"-1\" Domain=\"\" DomainOrigin=\"\" DPort=\"-1\" DropLog=\"False\" DropRaw=\"False\" Duration=\"\" EntityId=\"" EventClassification=\"-1\" EventCommonEventID=\"-1\" FalseAlarmRating=\"0\" Forward=\"False\" ForwardToLogMart=\"False\" GLPRAssignedRBP=\"-1\" Group=\"\" HasBeenInserted_EMDB=\"False\" HasBeenQueued_Archiving=\"True\" HasBeenQueued_EventProcessor=\"False\" HasBeenQueued_LogProcessor=\"True\" Hash=\"\" HostID=\"44\" IgnoreGlobalRBPCriteria=\"False\" ImpactedEntityId=\"0\" ImpactedEntityName=\"\" ImpactedHostId=\"-1\" ImpactedHostName=\"\" ImpactedLocationKey=\"\" ImpactedLocationName=\"\" ImpactedNetworkId=\"-1\" ImpactedNetworkName=\"\" ImpactedZoneEnum=\"0\" ImpactedZoneName=\"\" IsDNameParsedValue=\"True\" IsRemote=\"True\" IsSNameParsedValue=\"True\" ItemsIn=\"\" ItemsOut=\"\" LDSVERSION=\"1.1\" Login=\"\" LogMartMode=\"13627389\" LogSourceId=\"158\" LogSourceName=\"ip-xx-xx-xx-xx.eu-west-2.computer.internal Linux Syslog\" MediatorMsgID=\"0\" MediatorSessionID=\"1640\" MsgClassId=\"3999\" MsgClassName=\"Other Operations\" MsgClassTypeId=\"3000\" MsgClassTypeName=\"Operations\" MsgCount=\"1\" MsgDate=\"2021-02-13T23:45:24.0000000+00:00\" MsgDateOrigin=\"0\" MsgSourceHostID=\"44\" MsgSourceTypeId=\"88\" MsgSourceTypeName=\"Syslog - Linux Host\" NormalMsgDate=\"2021-02-13T23:45:24.0540000Z\" Object=\"\" ObjectName=\"\" ObjectType=\"\" OriginEntityId=\"0\" OriginEntityName=\"\" OriginHostId=\"-1\" OriginHostName=\"\" OriginLocationKey=\"\" OriginLocationName=\"\" OriginNetworkId=\"-1\" OriginNetworkName=\"\" OriginZoneEnum=\"0\" OriginZoneName=\"\" ParentProcessId=\"\" ParentProcessName=\"\" ParentProcessPath=\"\" PID=\"-1\" Policy=\"\" Priority=\"4\" Process=\"\" ProtocolId=\"-1\" ProtocolName=\"\" Quantity=\"\" Rate=\"\" Reason=\"\" Recipient=\"\" RecipientIdentity=\"\" RecipientIdentityCompany=\"\" RecipientIdentityDepartment=\"\" RecipientIdentityDomain=\"\" RecipientIdentityID=\"-1\" RecipientIdentityTitle=\"\" ResolvedImpactedName=\"\" ResolvedOriginName=\"\" ResponseCode=\"\" Result=\"\" RiskRating=\"0\" RootEntityId=\"9\" Sender=\"\" SenderIdentity=\"\" SenderIdentityCompany=\"\" SenderIdentityDepartment=\"\" SenderIdentityDomain=\"\" SenderIdentityID=\"-1\" SenderIdentityTitle=\"\" SerialNumber=\"\" ServiceId=\"-1\" ServiceName=\"\" Session=\"\" SessionType=\"\" Severity=\"\" SInterface=\"\" SIP=\"\" Size=\"\" SMAC=\"\" SName=\"\" SNameParsed=\"\" SNameResolved=\"\" SNATIP=\"\" SNATPort=\"-1\" SPort=\"-1\" Status=\"\" Subject=\"\" SystemMonitorID=\"9\" ThreatId=\"\" ThreatName=\"\" UniqueID=\"7d4c4ed3-a2fc-44bc-a7ec-0b8b68e7f456\" URL=\"\" UserAgent=\"\" UserImpactedIdentity=\"\" UserImpactedIdentityCompany=\"\" UserImpactedIdentityDomain=\"\" UserImpactedIdentityID=\"-1\" UserImpactedIdentityTitle=\"\" UserOriginIdentity=\"\" UserOriginIdentityCompany=\"\" UserOriginIdentityDepartment=\"\" UserOriginIdentityDomain=\"\" UserOriginIdentityID=\"-1\" UserOriginIdentityTitle=\"\" VendorInfo=\"\" VendorMsgID=\"\" Version=\"\" RawLog=\"02 13 2021 23:45:24 xx.xx.xx.xx <SYSD:INFO> Feb 13 23:45:24 euw2-ec2--001 metricbeat[3031]: 2021-02-13T23:45:24.264Z#011ERROR#011[logstash.node_stats]#011node_stats/node_stats.go:73#011error making http request: Get \\\"https://xx.xx.xx.xx:9600/\\\": dial tcp xx.xx.xx.xx:9600: connect: connection refused\"","CollectionSequence":"825328","NormalMsgDate":"2021-02-13T23:45:24.0540000Z"}

I am a little unsure of the best way to achieve this and thought you guys might have some suggestions. I have looked into grok and think this may achieve my objective however I'm unsure where to start.

Upvotes: 0

Views: 284

Answers (1)

Val
Val

Reputation: 217254

You can do this with conditionals in your filter section and define the target index according to the type of logs you're parsing.

filter {
  ... other filters ...

  if [MsgSourceTypeName] == "Syslog - Linux Host" {
    mutate {
      add_field => {
        "[@metadata][target_index]" => "logs-syslog"
      }
    }
  }
  else if [MsgSourceTypeName] == "MS Windows Event Logging XML" {
    mutate {
      add_field => {
        "[@metadata][target_index]" => "logs-ms_event_log"
      }
    }
  }
}
output {
   elasticsearch {
     hosts => ["http://xx.xx.xx.xx:xxxx","http://xx.xx.xx.xx:xxxx"]
     index => "%{[@metadata][target_index]}"
   }
}

Upvotes: 1

Related Questions