DebD
DebD

Reputation: 386

Join files in Apache Spark

I have a file like this. code_count.csv

code,count,year
AE,2,2008
AE,3,2008
BX,1,2005
CD,4,2004
HU,1,2003
BX,8,2004

Another file like this. details.csv

code,exp_code
AE,Aerogon international
BX,Bloomberg Xtern
CD,Classic Divide
HU,Honololu

I want the total sum for each code but in the final output, I want the exp_code. Like this

Aerogon international,5
Bloomberg Xtern,4
Classic Divide,4

Here is my code

var countData=sc.textFile("C:\path\to\code_count.csv")
var countDataKV=countData.map(x=>x.split(",")).map(x=>(x(0),1))
var sum=countDataKV.foldBykey(0)((acc,ele)=>{(acc+ele)})
sum.take(2)

gives

Array[(String, Int)] = Array((AE,5), (BX,9))

Here sum is RDD[(String, Int)]. I am kind of confused about how to pull the exp_code from the other file. Please guide.

Upvotes: 0

Views: 66

Answers (2)

koiralo
koiralo

Reputation: 23109

You need to calculate the sum after groupby with code and then join another dataframe. Below is similar example.

import spark.implicits._
  val df1 = spark.sparkContext.parallelize(Seq(("AE",2,2008), ("AE",3,2008), ("BX",1,2005), ("CD",4,2004), ("HU",1,2003), ("BX",8,2004)))
    .toDF("code","count","year")

  val df2 = spark.sparkContext.parallelize(Seq(("AE","Aerogon international"),
    ("BX","Bloomberg Xtern"), ("CD","Classic Divide"), ("HU","Honololu"))).toDF("code","exp_code")


  val sumdf1 = df1.select("code", "count").groupBy("code").agg(sum("count"))

  val finalDF = sumdf1.join(df2, "code").drop("code")

finalDF.show()

Upvotes: 1

Aravind Kumar Anugula
Aravind Kumar Anugula

Reputation: 1326

If you are using spark version > 2.0 you can use following code directly. com.databricks.spark.csv is available by default as part of spark 2.0

val codeDF = spark
      .read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("hdfs://pathTo/code_count.csv")    

val detailsDF = spark
      .read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("hdfs://pathTo/details.csv")    
//
//
import org.apache.spark.sql.functions._
val resDF = codeDF.join(detailsDF,codeDF.col("code")===detailsDF.col("code")).groupBy(codeDF.col("code"),detailsDF.col("exp_code")).agg(sum("count").alias("cnt"))

output: enter image description here If you are using spark <=1.6 version. you can use following code.

you can follow this link to use com.databricks.spark.csv

https://github.com/databricks/spark-csv

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc);
 import hiveContext.implicits._
val codeDF = hiveContext.read.format("com.databricks.spark.csv")
                                        .option("header", "true")
                                        .option("treatEmptyValuesAsNulls", "true")
                                        .option("inferSchema", "true")
                                        .option("delimiter",",")
                                        .load("hdfs://pathTo/code_count.csv")
val detailsDF = hiveContext.read.format("com.databricks.spark.csv")
                                        .option("header", "true")                                       
                                        .option("inferSchema", "true")
                                        .option("delimiter",",")
                                        .load("hdfs://pathTo/details.csv")

import org.apache.spark.sql.functions._
val resDF = codeDF.join(detailsDF,codeDF.col("code")===detailsDF.col("code")).groupBy(codeDF.col("code"),detailsDF.col("exp_code")).agg(sum("count").alias("cnt"))

Upvotes: 1

Related Questions