mcmcmc
mcmcmc

Reputation: 641

How to add source file name to each row in Spark?

I'm new to Spark and am trying to insert a column to each input row with the file name that it comes from.

I've seen others ask a similar question, but all their answers used wholeTextFile, but I'm trying to do this for larger CSV files (read using the Spark-CSV library), JSON files, and Parquet files (not just small text files).

I can use the spark-shell to get a list of the filenames:

val df = sqlContext.read.parquet("/blah/dir")
val names = df.select(inputFileName())
names.show

but that's a dataframe. I am not sure how to add it as a column to each row (and if that result is ordered the same as the initial data either, though I assume it always is) and how to do this as a general solution for all input types.

Upvotes: 14

Views: 21526

Answers (2)

Dipankar
Dipankar

Reputation: 3510

Another solution I just found to add file name as one of the columns in DataFrame

val df = sqlContext.read.parquet("/blah/dir")

val dfWithCol = df.withColumn("filename",input_file_name())

Ref: spark load data and add filename as dataframe column

Upvotes: 17

Beryllium
Beryllium

Reputation: 13008

When you create a RDD from a text file, you probably want to map the data into a case class, so you could add the input source in that stage:

case class Person(inputPath: String, name: String, age: Int)
val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt"
val rdd = sc.textFile(inputPath).map {
    l =>
      val tokens = l.split(",")
      Person(inputPath, tokens(0), tokens(1).trim().toInt)
  }
rdd.collect().foreach(println)

If you do not want to mix "business data" with meta data:

case class InputSourceMetaData(path: String, size: Long)
case class PersonWithMd(name: String, age: Int, metaData: InputSourceMetaData)

// Fake the size, for demo purposes only
val md = InputSourceMetaData(inputPath, size = -1L)
val rdd = sc.textFile(inputPath).map {
  l =>
    val tokens = l.split(",")
    PersonWithMd(tokens(0), tokens(1).trim().toInt, md)
}
rdd.collect().foreach(println)

and if you promote the RDD to a DataFrame:

import sqlContext.implicits._
val df = rdd.toDF()
df.registerTempTable("x")

you can query it like

sqlContext.sql("select name, metadata from x").show()
sqlContext.sql("select name, metadata.path from x").show()
sqlContext.sql("select name, metadata.path, metadata.size from x").show()

Update

You can read the files in HDFS using org.apache.hadoop.fs.FileSystem.listFiles() recursively.

Given a list of file names in a value files (standard Scala collection containing org.apache.hadoop.fs.LocatedFileStatus), you can create one RDD for each file:

val rdds = files.map { f =>
  val md = InputSourceMetaData(f.getPath.toString, f.getLen)

  sc.textFile(md.path).map {
    l =>
      val tokens = l.split(",")
      PersonWithMd(tokens(0), tokens(1).trim().toInt, md)
  }
}

Now you can reduce the list of RDDs into a single one: The function for reduce concats all RDDs into a single one:

val rdd = rdds.reduce(_ ++ _)
rdd.collect().foreach(println)

This works, but I cannot test if this distributes/performs well with large files.

Upvotes: 2

Related Questions