c-rod
c-rod

Reputation: 43

How to parse log lines using Spark that could span multiple lines

I'm developing a Spark/Scala Application that can read and parse a custom log file. I'm having trouble parsing multi-line log entries. Here's a snippet of my code:

case class MLog(dateTime: String, classification: String, serverType: String, identification:String, operation: String)
val PATTERN = """(?s)(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\s+(\w+)s+\[(.*)\]\s+\[(.*)\]\s+(.*)"""


def parseLogLine(log: String): MLog={
     val res = PATTERN.findFirstMatchIn(log)
     if (res.isEmpty) {
     throw new RuntimeException("Cannot parse log line: " + log)

     MLog(m.group(1),m.group(2),m.group(3),m.group(4),m.group(5))
}

sc.textFile("/mydirectory/logfile").map(parseLogLine).foreach(println)

Some of the entries in logfile span multiple lines. The regex works fine for single line entries but when a multi-line entry is read like the one shown below,

2015-08-31 00:10:17,682 WARN  [ScheduledTask-10] [name=custname;mid=9999;ds=anyvalue;] datasource - Scheduled DataSource import failed.                 
com.xxx.common.service.ServiceException: system failure: Unable to connect to ANY server: LdapDataSource{id=xxx, type=xxx, enabled=true, name=xxx, host=xxx port=999, connectionType=ssl, username=xxx, folderId=99999}

I receive this error:

Cannot parse log line:com.xxx.common.service.ServiceException: system failure: Unable to connect to ANY server: LdapDataSource{id=xxx, type=xxx, enabled=true, name=xxx, host=xxx port=999, connectionType=ssl, username=xxx, folderId=99999}

How can I get Spark to read multi-line log entries from a log file?

Upvotes: 4

Views: 3404

Answers (1)

zero323
zero323

Reputation: 330353

Since input files are small you can use SparkContext.wholeTextFiles.

// Parse a single file and return all extracted entries
def parseLogFile(log: String): Iterator[MLog] = {
    val p: scala.util.matching.Regex = ???
    p.findAllMatchIn(log).map(
        m => MLog(m.group(1), m.group(2), m.group(3), m.group(4), m.group(5))
    )
}

val rdd: RDD[MLog] = sc
   .wholeTextFiles("/path/to/input/dir")
   .flatMap{case (_, txt) => parseLogFile(txt)}

Upvotes: 3

Related Questions