Reputation: 7054
I have measurements over 10 years coming from 10 000 sensors. This is available as ASCII files stored in HDFS (to be improved, not the topic of this request):
As a proof of concept, I compute the mean for a sensor using Spark with the following SCALA code targeting Spark v1.6.1
// Read file as text
val lines = sc.textFile("/data/sensor_1.dat")
// Drop header
val header = lines.first
val lines_clean = lines.filter(line => line != header)
// Compute mean
val values = lines_clean.map(_.split("\t").last.toDouble)
val mean = values.sum / values.count
Now, I want to apply it over 10 000 files, getting one mean value for each of my sensors. How should I proceed ? Should I implement a loop ? Can I handle a RDD at file level, and not file line level ? Any better ideas ?
Thanks !
Upvotes: 2
Views: 739
Reputation: 5220
You can just read the whole folder like this:
import org.apache.spark.sql.functions.input_file_name
val inputPath: String = "/data/"
val lines = sqlContext.read.text(inputPath)
.select(input_file_name.alias("filename"), $"value")
.rdd
Then you can process the String
for the file name and the value the same way as you did in the question:
val linesClean = lines.filter(l => l.getString(1) != header).map(l => (l.getString(0), l.getString(1)))
val meanForEachFile = linesClean.groupByKey().map{
case (name, linesInFile) =>
val values = linesInFile.map(v => v.split("\t").last.toDouble)
val mean = values.sum / values.count
mean
}
Upvotes: 0
Reputation: 28422
You can try using the wholeTextFiles() method (here), it reads a whole directory and returns a pair RDD with (filename, content) pairs.
The filename would then be the sensor and the content can be processed in a similar way to before.
Upvotes: 1
Reputation: 25939
As the data in each file does not seem to include the sensor id you'd probably want to use the wholeTextFiles option which will load each file into an pairRDD where the key is the file name. It will mean more parsing as you'd need to parse the to get the sensor name and split the whole file from the value to get the samples - but at least you'd be able to distinguish which data belongs to which sensor.
You should note that the path you pass to wholeTextFiles (or textFile for that matter) can be a list of path including wildcard as in sc.wholeTextFiles("/dir/to/first/sensor,/dir/to/second/sensor,/sensor[0-10]*,/etc")
Upvotes: 0
Reputation: 1161
Data is stored in hdfs
it means that it is distributed over your cluster and spark can achieve data parallelism but you write code as it was one directory, well you need to worry about partitions. Also transform data into parquet for that very reason if possible.
I strongly recommend to use datasets
as spark
will be able to optimise the computation.
import org.apache.spark.sql.functions._
case class Sensor(time: java.sql.Timestamp, value: Double)
val ds = spark.read
.option("header", "true")
.option("delimiter", "\t")
.csv(s"hdfs://${master}:9000/data.tsv")
.as[Sensor]
// tune by partition number .partition(100)
val mean = ds.select(avg(col("value")).as("mean"))
Upvotes: 0