Reputation: 55
I am reading multiple files from an HDFS directory, and for each file the generated data is printed using:
frequencies.foreach(x => println(x._1 + ": "+x._2))
And the printed data is (for File1.txt):
'text': 45
'data': 100
'push': 150
The key can be different for other files like (File2.txt):
'data': 45
'lea': 100
'jmp': 150
The key is not necessarily the same in all the files. I want all the file data to be written to a .csv file in the following format:
Filename text data push lea jmp
File1.txt 45 100 150 0 0
File2.txt 0 45 0 100 150 ....
Can someone please help me find a solution to this problem?
Upvotes: 2
Views: 1127
Reputation: 10092
I'd suggest creating one dataframe for all the files inside your directory and then using a pivot
to re-shape the data accordingly :
val df1 = sc.parallelize(Array(
("text",45 ),
("data",100 ),
("push",150 ))).toDF("key", "value").withColumn("Filename", lit("File1") )
val df2 = sc.parallelize(Array(
("data",45 ),
("lea",100 ),
("jump",150 ))).toDF("key", "value").withColumn("Filename", lit("File2") )
val df = df1.unionAll(df2)
df.show
+----+-----+--------+
| key|value|Filename|
+----+-----+--------+
|text| 45| File1|
|data| 100| File1|
|push| 150| File1|
|data| 45| File2|
| lea| 100| File2|
|jump| 150| File2|
+----+-----+--------+
val finalDf = df.groupBy($"Filename").pivot("key").agg(first($"value") ).na.fill(0)
finalDf.show
+--------+----+----+---+----+----+
|Filename|data|jump|lea|push|text|
+--------+----+----+---+----+----+
| File1| 100| 0| 0| 150| 45|
| File2| 45| 150|100| 0| 0|
+--------+----+----+---+----+----+
You can write it as a CSV using DataFrameWriter
df.write.csv(..)
The hard part with this would be creating a different dataframe for each file with an extra column for the Filename
from which the dataframe is created
Upvotes: 0
Reputation: 1110
If your files doesn't big enough, you can done without spark. Here is my example code, csv format is old style, doesn't like your expected output, but you can tweak it easily.
import scala.io.Source
import org.apache.hadoop.fs._
val sparkSession = ... // I created it to retrieve hadoop configuration, you can create your own Configuration.
val inputPath = ...
val outputPath = ...
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
// read all files content to Array of Map[String,String]
val filesContent = fs.listStatus(new Path(inputPath)).filter(_.isFile).map(_.getPath).filter(_.getName.endsWith(".txt"))
.map(s => (s.getName, Source.fromInputStream(fs.open(s)).getLines()
.map(_.split(":").map(_.trim))
.filter(_.length == 2)
.map(p => (p.head, p.last)).toMap))
// create default Map with all possible keys
val listKeys = filesContent.flatMap(_._2.keys).distinct.map(s => (s, "0")).toMap
val csvContent = filesContent.map(s => (s._1, listKeys ++ s._2))
.map(s => (s._1, s._2.values.mkString(",")))
.map(s => s"${s._1},${s._2}")
.mkString("\n")
val csvHeader = ("Filename" +: listKeys.keys.toList).mkString(",")
val csv = csvHeader + "\n" + csvContent
new PrintWriter(fs.create(new Path(outputPath))){
write(csv)
close()
}
Upvotes: 1