RandomCoder
RandomCoder

Reputation: 7054

Distribute Spark processing independently over files

I have measurements over 10 years coming from 10 000 sensors. This is available as ASCII files stored in HDFS (to be improved, not the topic of this request):

As a proof of concept, I compute the mean for a sensor using Spark with the following SCALA code targeting Spark v1.6.1

// Read file as text
val lines = sc.textFile("/data/sensor_1.dat")
// Drop header
val header = lines.first
val lines_clean = lines.filter(line => line != header)
// Compute mean
val values = lines_clean.map(_.split("\t").last.toDouble)
val mean = values.sum / values.count

Now, I want to apply it over 10 000 files, getting one mean value for each of my sensors. How should I proceed ? Should I implement a loop ? Can I handle a RDD at file level, and not file line level ? Any better ideas ?

Thanks !

Upvotes: 2

Views: 739

Answers (4)

jamborta
jamborta

Reputation: 5220

You can just read the whole folder like this:

import org.apache.spark.sql.functions.input_file_name

val inputPath: String = "/data/"

val lines = sqlContext.read.text(inputPath)
  .select(input_file_name.alias("filename"), $"value")
  .rdd

Then you can process the String for the file name and the value the same way as you did in the question:

val linesClean = lines.filter(l => l.getString(1) != header).map(l => (l.getString(0), l.getString(1)))
val meanForEachFile = linesClean.groupByKey().map{
    case (name, linesInFile) => 
    val values = linesInFile.map(v => v.split("\t").last.toDouble)
    val mean = values.sum / values.count
    mean
}

Upvotes: 0

Shaido
Shaido

Reputation: 28422

You can try using the wholeTextFiles() method (here), it reads a whole directory and returns a pair RDD with (filename, content) pairs.

The filename would then be the sensor and the content can be processed in a similar way to before.

Upvotes: 1

Arnon Rotem-Gal-Oz
Arnon Rotem-Gal-Oz

Reputation: 25939

As the data in each file does not seem to include the sensor id you'd probably want to use the wholeTextFiles option which will load each file into an pairRDD where the key is the file name. It will mean more parsing as you'd need to parse the to get the sensor name and split the whole file from the value to get the samples - but at least you'd be able to distinguish which data belongs to which sensor.

You should note that the path you pass to wholeTextFiles (or textFile for that matter) can be a list of path including wildcard as in sc.wholeTextFiles("/dir/to/first/sensor,/dir/to/second/sensor,/sensor[0-10]*,/etc")

Upvotes: 0

elcomendante
elcomendante

Reputation: 1161

Data is stored in hdfs it means that it is distributed over your cluster and spark can achieve data parallelism but you write code as it was one directory, well you need to worry about partitions. Also transform data into parquet for that very reason if possible.

I strongly recommend to use datasets as spark will be able to optimise the computation.

import org.apache.spark.sql.functions._

case class Sensor(time: java.sql.Timestamp, value: Double)

val ds = spark.read
  .option("header", "true")
  .option("delimiter", "\t")
  .csv(s"hdfs://${master}:9000/data.tsv")
.as[Sensor] 
 // tune by partition number   .partition(100)

val mean = ds.select(avg(col("value")).as("mean"))

Upvotes: 0

Related Questions