miniQ
miniQ

Reputation: 721

List processing in DStream

I have of List of words as a DStream. Eg: List(car, speed, accident, speed, bad). I want to form bi grams from this list. I have this with RDDs but facing issues with DStreams. I am using the foreachRDD function. Below is what I have -

Am trying to print the contents of RDD after transformation.

 def printRDD(rddString: RDD[String]) ={
      val z = rddString.map( y => y.toString.split(",").filter(_.nonEmpty).
        map( y => y.replaceAll("""\W""", "").toLowerCase)
        .filter(_.nonEmpty)
        .sliding(2).filter(_.size == 2).map{ case Array(a, b) => ((a, b), 1) })
        .flatMap(x => x)
        println(z)
}
 val x = lines.map(plainTextToLemmas(_, stopWords))
 val words = x.flatMap( y=> y.toString.split(","))
 words.foreachRDD( rdd => printRDD(rdd))

Is there any way to show the contents after transformation function printRDD. Even if I use println(z) inside the print definition, it returns MapPartitionsRDD[18] at flatMap. I am using Kafka spark streaming to read inputs, I get the words value on the console. I think the words do not get changed after invoking the function printRDD.

Upvotes: 1

Views: 450

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149618

You can do all these operating on the DStream, not inside foreachRDD and then call print on the DStream:

lines
  .map(plainTextToLemmas(_, stopWords))
  .flatMap(y => y.toString.split(","))
  .map(y => y.toString.split(",").filter(_.nonEmpty))
  .map(y => y.replaceAll("""\W""", "").toLowerCase)
  .filter(_.nonEmpty)
  .sliding(2)
  .filter(_.size == 2)
  .flatMap { case Array(a, b) => ((a, b), 1) } 
  .print()

This should print out the content of the DStream to the console on the Driver.

An important thing to note is that although you're operating on a DStream, it's methods "drill into" the underlying RDD at the given batch time and expose the actual type inside the RDD, so you shouldn't need to use foreachRDD to reach the actual data inside.

Upvotes: 1

Related Questions