VictorGram
VictorGram

Reputation: 2661

Scala/Spark : How to do outer join based on common columns

I have 2 data dataframes:

I read those files and created dataframes as :

val dataRecordsTemp = sc.textFile(tempFile).map{rec=>
            val splittedRec = rec.split("\\s+")
            Temparature(splittedRec(0),splittedRec(1),splittedRec(2),splittedRec(3),splittedRec(4))
        }.map{x => Row.fromSeq(x.getDataFields())}

val headerFieldsForTemp = Seq("YEAR","MONTH","DAY","MAX_TEMP","MIN_TEMP")
val schemaTemp = StructType(headerFieldsForTemp.map{f => StructField(f, StringType, nullable=true)})
val dfTemp = session.createDataFrame(dataRecordsTemp,schemaTemp)
              .orderBy(desc("year"), desc("month"), desc("day"))

println("Printing temparature data ...............................")
dfTemp.select("YEAR","MONTH","DAY","MAX_TEMP","MIN_TEMP").take(10).foreach(println)

val dataRecordsPrecip = sc.textFile(precipFile).map{rec=>
        val splittedRec = rec.split("\\s+")
        Precipitation(splittedRec(0),splittedRec(1),splittedRec(2),splittedRec(3),splittedRec(4),splittedRec(5))
      }.map{x => Row.fromSeq(x.getDataFields())}

val headerFieldsForPrecipitation = Seq("YEAR","MONTH","DAY","PRECIPITATION","SNOW","SNOW_COVER")
val schemaPrecip = StructType(headerFieldsForPrecipitation.map{f => StructField(f, StringType, nullable=true)})
val dfPrecip = session.createDataFrame(dataRecordsPrecip,schemaPrecip)
      .orderBy(desc("year"), desc("month"), desc("day"))

println("Printing precipitation data ...............................")
dfPrecip.select("YEAR","MONTH","DAY","PRECIPITATION","SNOW","SNOW_COVER").take(10).foreach(println)

I have to join 2 RDDs based on common columns (year,month,day). Input files have header and output file will have the header as well.The 1st file has information on temperature as (example):

year month day min-temp mav-temp
2017 12    13  13       25
2017 12    16  25       32
2017 12    25  34       56

2nd file has information precipitation as (example)

year month day precipitation snow snow-cover
2018  7    6   0.00          0.0  0
2017  12   13  0.04          0.0  0
2017  12   16  0.4           0.04 1

My expected output should be (ordered by date asynchronous , if no value found then blank):

year month day min-temp mav-temp precipitation snow snow-cover
2017 12    13  13       25       0.04          0.0  0
2017 12    16  25       32       0.4           0.04 1
2017 12    25  34       56                 
2018  7    6                     0.00          0.0  0

May I get help on how to do that in Scala?

Upvotes: 0

Views: 256

Answers (1)

Mikhail Dubkov
Mikhail Dubkov

Reputation: 1233

You need outer join these two datasets and then order result like this:

import org.apache.spark.sql.functions._

dfTemp
      .join(dfPrecip, Seq("year", "month", "day"), "outer")
      .orderBy(desc("year"), desc("month"), desc("day"))
      .na.fill("")

If you don't need blank values and fine with null, then you may avoid .na.fill("").

Hope it helps!

Upvotes: 1

Related Questions