Reputation: 721
I have of List of words as a DStream. Eg: List(car, speed, accident, speed, bad). I want to form bi grams from this list. I have this with RDDs but facing issues with DStreams. I am using the foreachRDD function. Below is what I have -
Am trying to print the contents of RDD after transformation.
def printRDD(rddString: RDD[String]) ={
val z = rddString.map( y => y.toString.split(",").filter(_.nonEmpty).
map( y => y.replaceAll("""\W""", "").toLowerCase)
.filter(_.nonEmpty)
.sliding(2).filter(_.size == 2).map{ case Array(a, b) => ((a, b), 1) })
.flatMap(x => x)
println(z)
}
val x = lines.map(plainTextToLemmas(_, stopWords))
val words = x.flatMap( y=> y.toString.split(","))
words.foreachRDD( rdd => printRDD(rdd))
Is there any way to show the contents after transformation function printRDD. Even if I use println(z) inside the print definition, it returns MapPartitionsRDD[18] at flatMap. I am using Kafka spark streaming to read inputs, I get the words value on the console. I think the words do not get changed after invoking the function printRDD.
Upvotes: 1
Views: 450
Reputation: 149618
You can do all these operating on the DStream
, not inside foreachRDD
and then call print
on the DStream
:
lines
.map(plainTextToLemmas(_, stopWords))
.flatMap(y => y.toString.split(","))
.map(y => y.toString.split(",").filter(_.nonEmpty))
.map(y => y.replaceAll("""\W""", "").toLowerCase)
.filter(_.nonEmpty)
.sliding(2)
.filter(_.size == 2)
.flatMap { case Array(a, b) => ((a, b), 1) }
.print()
This should print out the content of the DStream
to the console on the Driver.
An important thing to note is that although you're operating on a DStream
, it's methods "drill into" the underlying RDD
at the given batch time and expose the actual type inside the RDD
, so you shouldn't need to use foreachRDD
to reach the actual data inside.
Upvotes: 1