M Haris Khan
M Haris Khan

Reputation: 55

Writing RDD Data to CSV with Dynamic Columns in Spark - Scala

I am reading multiple files from an HDFS directory, and for each file the generated data is printed using:

frequencies.foreach(x => println(x._1 + ": "+x._2))

And the printed data is (for File1.txt):

'text': 45
'data': 100
'push': 150

The key can be different for other files like (File2.txt):

'data': 45
'lea': 100
'jmp': 150

The key is not necessarily the same in all the files. I want all the file data to be written to a .csv file in the following format:

Filename   text  data  push  lea  jmp
File1.txt  45    100   150   0    0
File2.txt  0     45    0     100  150  ....

Can someone please help me find a solution to this problem?

Upvotes: 2

Views: 1127

Answers (2)

philantrovert
philantrovert

Reputation: 10092

I'd suggest creating one dataframe for all the files inside your directory and then using a pivot to re-shape the data accordingly :

val df1 = sc.parallelize(Array(
("text",45  ),
("data",100 ),
("push",150 ))).toDF("key", "value").withColumn("Filename", lit("File1") )

val df2 = sc.parallelize(Array(
("data",45  ),
("lea",100 ),
("jump",150 ))).toDF("key", "value").withColumn("Filename", lit("File2") )

val df = df1.unionAll(df2)

df.show
+----+-----+--------+
| key|value|Filename|
+----+-----+--------+
|text|   45|   File1|
|data|  100|   File1|
|push|  150|   File1|
|data|   45|   File2|
| lea|  100|   File2|
|jump|  150|   File2|
+----+-----+--------+


val finalDf = df.groupBy($"Filename").pivot("key").agg(first($"value") ).na.fill(0)

finalDf.show
+--------+----+----+---+----+----+
|Filename|data|jump|lea|push|text|
+--------+----+----+---+----+----+
|   File1| 100|   0|  0| 150|  45|
|   File2|  45| 150|100|   0|   0|
+--------+----+----+---+----+----+

You can write it as a CSV using DataFrameWriter

df.write.csv(..)

The hard part with this would be creating a different dataframe for each file with an extra column for the Filename from which the dataframe is created

Upvotes: 0

Thang Nguyen
Thang Nguyen

Reputation: 1110

If your files doesn't big enough, you can done without spark. Here is my example code, csv format is old style, doesn't like your expected output, but you can tweak it easily.

  import scala.io.Source
  import org.apache.hadoop.fs._
  val sparkSession =   ...  // I created it to retrieve hadoop configuration, you can create your own Configuration.
  val inputPath =   ...
  val outputPath =   ...

  val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
  // read all files content to Array of Map[String,String]
  val filesContent = fs.listStatus(new Path(inputPath)).filter(_.isFile).map(_.getPath).filter(_.getName.endsWith(".txt"))
    .map(s => (s.getName, Source.fromInputStream(fs.open(s)).getLines()
                    .map(_.split(":").map(_.trim))
                    .filter(_.length == 2)
                    .map(p => (p.head, p.last)).toMap))
  // create default Map with all possible keys
  val listKeys = filesContent.flatMap(_._2.keys).distinct.map(s => (s, "0")).toMap
  val csvContent = filesContent.map(s => (s._1, listKeys ++ s._2))
    .map(s => (s._1, s._2.values.mkString(",")))
    .map(s => s"${s._1},${s._2}")
    .mkString("\n")
  val csvHeader = ("Filename" +: listKeys.keys.toList).mkString(",")
  val csv = csvHeader + "\n" + csvContent

  new PrintWriter(fs.create(new Path(outputPath))){
    write(csv)
    close()
  }

Upvotes: 1

Related Questions