Reputation: 1634
I have input lines like below
t1, file1, 1, 1, 1
t1, file1, 1, 2, 3
t1, file2, 2, 2, 2, 2
t2, file1, 5, 5, 5
t2, file2, 1, 1, 2, 2
and the output like below rows which is a vertical addition of the corresponding numbers.
file1 : [ 1+, 1+2+5, 1+3+5 ]
file2 : [ 2+1, 2+1, 2+2, 2+2 ]
Currently data aggregation logic is working for batch interval, but it's not maintaining state. So, i am adding update_state_by_key function and passing below function, Is this right way to do?
My current program:
def updateValues( newValues: Seq[Array[Int]], currentValue: Option[Array[Int]]) = {
val previousCount = currentValue.getOrElse(Array.fill[Byte](newValues.length)(0))
val allValues = newValues +: previousCount
Some(allValues.toList.transpose.map(_.sum).toArray)
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
// parse the lines of data into coverage objects
val inputStream = ssc.socketTextStream(<hostname>, 9999)
ssc.checkpoint("<hostname>:8020/user/spark/checkpoints_dir")
inputStream.print(10)
val parsedDstream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
})
val aggregated_file_counts = parsedDstream.updateStateByKey(updateValues)
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
For reference, my previous program (without stateful transformation):
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
val inputStream = ssc.socketTextStream("hostname", 9999)
val parsedDstream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
})
.reduceByKey((first, second) => {
val listOfArrays = ArrayBuffer(first, second)
listOfArrays.toList.transpose.map(_.sum).toArray
})
.foreachRDD(rdd => rdd.foreach(Blaher.blah))
}
Thanks in advance.
Upvotes: 3
Views: 424
Reputation: 330413
What you're looking for is updateStateByKey
. For DStream[(T, U)]
it should take a function with two arguments:
Seq[U]
- representing state for current windowOption[U]
- representing accumulated stateand return Option[U]
.
Given your code it could be implemented for example like this:
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try
val state: DStream[(String, Array[Int])] = parsedStream.updateStateByKey(
(current: Seq[Array[Int]], prev: Option[Array[Int]]) => {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})
To be able to use it you'll have to configure checkpointing.
Upvotes: 1