Amresh
Amresh

Reputation: 13

Scala RDD with pattern matching from a text file

kinda new to Spark. So here's the problem, I have a txt file with some general format, lets say: Time : Message So i have to implement two things: RDD and pattern groups and matching.

To take the file as an rdd:

val rdd1 = sc.textFile(location)

To Build the pattern:

private val f1 = "([0-9]*)"
private val f2 = "([:])"
private val f3 = "(.*?)"
private val regex = s"$f1 $f2 $f3"
private val p = Pattern.compile(regex)

now i want to integrate both of these,

rdd1.map(//What to do here) 

I want to check each line if it matches the general format.If it doesn't i want to display an error message for each line that doesn't match.

if it does match, i wanna make groups for the pattern stated above. f1 is group1, f2 is group2, f3 is group three.Finally i wanna search f3(message field) for keywords like error,failure.

I know this is a lot to ask for.Thx in advance.

What i have already tried:

def parseLine(s1: String): Option[Groups] = {
val matcher = p.matcher(s1)
if (matcher.find) {
  println("General Format correct")
  //group
  Some(group(matcher))
  //after format is verified search f3 for error,failure keyword.

}
else {
  println("Format Wrong")
  None
}
}

def group(matcher: Matcher) = {
Line(
  matcher.group(1),
  matcher.group(2),
  matcher.group(3))}

case class Line(Time: String, colon: String, Message: String)

Now i am stuck on how to iterate the rdd to pass each line of the text file to the function.if i pass the entire rdd to the function ie, RDD[String] type.Other elements like matcher doesn't work cause it expects String type. While reading about the rdd functions:(correct me if i am wrong), the foreach method should iterate the rdd but i get type mismatches. currently trying the map function but havn't got it yet.

As i said i am new to spark rdd's.I don't know if using partition function would help me out instead of grouping.

I really need some guidance from the experianced. Any help is appriciated.

Upvotes: 1

Views: 5449

Answers (1)

mattinbits
mattinbits

Reputation: 10428

With simple examples like this, often the way you'd do it with an RDD is the same as how you'd do it with a simple Scala sequence. If we take Spark out of the equation, an approach would be:

import scala.util.{Failure, Success}

val input = List(
  "123 : Message1 Error",
  "This line doesn't conform",
  "345 : Message2",
  "Neither does this one",
  "789 : Message3 Error"
)

val f1 = "([0-9]*)"
val f2 = "([:])"
val f3 = "(.*)"
val regex = s"$f1 $f2 $f3".r

case class Line(a: String, b: String, c: String)


//Use Success and Failure as the functional way of representing
//operations which might not succeed
val parsed = input.map { str =>
  regex.findFirstMatchIn(str).map(m => Line(m.group(1), m.group(2), m.group(3))) match {
    case Some(l) => Success(l)
    case None => Failure(new Exception(s"Non matching input: $str"))
  }
}

//Now split the parsed result so we can handle the two types of outcome separately
val matching = parsed.filter(_.isSuccess)
val nonMatching = parsed.filter(_.isFailure)

nonMatching.foreach(println)

//Filter for only those messages we're interested in
val messagesWithError = matching.collect{
  case Success(l @ Line(_,_,m)) if m.contains("Error") => l
}

messagesWithError.foreach(println)

What's different when we do it in Spark? Not much:

  val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Example"))

  import scala.util.{Failure, Success}

  val input = List(
    "123 : Message1 Error",
    "This line doesn't conform",
    "345 : Message2",
    "Neither does this one",
    "789 : Message3 Error"
  )

  val f1 = "([0-9]*)"
  val f2 = "([:])"
  val f3 = "(.*)"
  val regex = s"$f1 $f2 $f3".r

  case class Line(a: String, b: String, c: String)

  val inputRDD = sc.parallelize(input)

  //Use Success and Failure as the functional way of representing
  //operations which might not succeed
  val parsedRDD = inputRDD.map { str =>
    regex.findFirstMatchIn(str).map(m => Line(m.group(1), m.group(2), m.group(3))) match {
      case Some(l) => Success(l)
      case None => Failure(new Exception(s"Non matching input: $str"))
    }
  }

  //Now split the parsed result so we can handle the two types of outcome separately
  val matchingRDD = parsedRDD.filter(_.isSuccess)
  val nonMatchingRDD = parsedRDD.filter(_.isFailure)

  //We use collect() to bring the results back from the Spark workers to the Driver
  nonMatchingRDD.collect().foreach(println)

  //Filter for only those messages we're interested in
  val messagesWithError = matchingRDD.collect {
    case Success(l@Line(_, _, m)) if m.contains("Error") => l
  }

  //We use collect() to bring the results back from the Spark workers to the Driver
  messagesWithError.collect().foreach(println)
}

If the resulting data sets were very large, using collect() to bring the results to the driver would not be appropriate, but neither would the use of println to log the results.

Upvotes: 2

Related Questions