Reputation: 1634
I am new in spark. I wanted to do spark streaming setup to retrieve key value pairs of below format files:
file: info1
Note: Each info file will have around of 1000 of these records. And our system is continuously generating these info files. Through, spark streaming i wanted to do mapping of line numbers and info files and wanted to get aggregate result.
Can we give input to spark cluster these kind of files? I am interested in the "SF" and "DA" delimiters only, "SF" corresponds to source file and "DA" corresponds the ( line number, count).
As this input data is not the line format, so is this the good idea to use these files for the spark input or should i need to do some intermediary stage where i need to clean these files to generate new files which will have each record information in line instead of blocks?
Or can we achieve this in Spark itself? What should be the right approach?
What i wanted to achieve? I wanted to get line level information. Means, to get line (As a key) and info files (as values)
Final output i wanted is like below: line178 -> (info1, info2, info7.................)
line 2908 -> (info3, info90, ..., ... ,)
Do let me know if my explanation is not clear or if i am missing something.
Thanks & Regards, Vinti
Upvotes: 1
Views: 88
Reputation: 1071
You could do something like this. Having your DStream stream:
// this gives you DA & FP lines, with the line number as the key
val validLines = stream.map(_.split(":")).
filter(line => Seq("DA", "FP").contains(line._1)).
map(_._2.split(","))
map(line => (line._1, line._2))
// now you should accumulate values
val state = validLines.updateStateByKey[Seq[String]](updateFunction _)
def updateFunction(newValues: Seq[Seq[String]], runningValues: Option[Seq[String]]): Option[Seq[String]] = {
// add the new values
val newVals = runnigValues match {
case Some(list) => list :: newValues
case _ => newValues
}
Some(newVals)
}
This should accumulate for each key a sequence with the values associated, storing it in state
Upvotes: 1