Abu Shoeb
Abu Shoeb

Reputation: 5152

How can I do full outer join on multiple csv files (Linux or Scala)?

I have 620 csv files and they have different columns and data. For example:

//file1.csv
word, count1
w1, 100
w2, 200

//file2.csv
word, count2
w1, 12
w5, 22

//Similarly fileN.csv
word, countN
w7, 17
w2, 28

My expected output

//result.csv
word, count1, count2, countN
w1,    100,     12,    null
w2,    200 ,   null,    28  
w5,    null,    22,    null
w7,    null,   null,    17

I was able to do it in Scala for two files like this where df1 is file1.csv and df2 is file2.csv:

df1.join(df2, Seq("word"),"fullouter").show()

I need any solution, either in Scala or Linux command to do this.

Upvotes: 1

Views: 492

Answers (1)

vindev
vindev

Reputation: 2280

Using Spark you can read all your files as a Dataframe and store it in a List[Dataframe]. After that you can apply reduce on that List for joining all the dataframes together. Following is the code using three Dataframes but you can extend and use same for all your files.

//create all three dummy DFs
val df1 = sc.parallelize(Seq(("w1", 100), ("w2", 200))).toDF("word", "count1")
val df2 = sc.parallelize(Seq(("w1", 12), ("w5", 22))).toDF("word", "count2")
val df3 = sc.parallelize(Seq(("w7", 17), ("w2", 28))).toDF("word", "count3")

//store all DFs in a list
val dfList: List[DataFrame] = List(df1, df2, df3)

//apply reduce function to join them together
val joinedDF = dfList.reduce((a, b) => a.join(b, Seq("word"), "fullouter"))

joinedDF.show()
//output
//+----+------+------+------+
//|word|count1|count2|count3|
//+----+------+------+------+
//|  w1|   100|    12|  null|
//|  w2|   200|  null|    28|
//|  w5|  null|    22|  null|
//|  w7|  null|  null|    17|
//+----+------+------+------+

//To write to CSV file
joinedDF.write
  .option("header", "true")
  .csv("PATH_OF_CSV")

This is how you can read all your files and store it in a List

//declare a ListBuffer to store all DFs
import scala.collection.mutable.ListBuffer
val dfList = ListBuffer[DataFrame]()

(1 to 620).foreach(x=>{
  val df: DataFrame = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .load(BASE_PATH + s"file$x.csv")

  dfList += df
})

Upvotes: 2

Related Questions