Reputation: 571
I am new to spark, and asked a similar question last week. It compiled but not working. So I really don't know what to do. Here is my problem: I have table A containing 3 columns, like this
-----------
A1 A1 A3
-----------
a b c
and Another Table B like this
------------------------------------
B1 B2 B3 B4 B5 B6 B7 B8 B9
------------------------------------
1 a 3 4 5 b 7 8 c
My logic is: A1 A2 A3 are my key, and it correspond to B2 B6 B9 in table B. I need to build a look up function that takes A1 A2 A3 as key and returns me B8.
This is what I tried last week:
//getting the data in to dataframe
val clsrowRDD = clsfile.map(_.split("\t")).map(p => Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8)))
val clsDataFrame = sqlContext.createDataFrame(clsrowRDD, clsschema)
//mapping the three key with the value
val smallRdd = clsDataFrame.rdd.map{row: Row => (mutable.WrappedArray.make[String](Array(row.getString(1), row.getString(5), row.getString(8))), row.getString(7))}
val lookupMap:Map[mutable.WrappedArray[String], String] = smallRdd.collectAsMap()
//build the look up function
def lookup(lookupMap: Map[mutable.WrappedArray[String],String]) =
udf((input: mutable.WrappedArray[String]) => lookupMap.lift(input))
//call the function
val combinedDF = mstrDataFrame.withColumn("ENTP_CLS_CD",lookup(lookupMap)($"SRC_SYS_CD",$"ORG_ID",$"ORG_CD"))
And this code compiles, but doesn't really return me the results I need. I am thinking it's because I am passing in an array as the key and I don't really have array inside my table. But when I tried change the map type as Map[(String,String,String),String]
, I don't know how you pass it in the function.
Tons of thanks.
Upvotes: 0
Views: 558
Reputation: 41957
If you are trying to get B8
value for every matching of A1
with B2
and A2
with B6
and A3
with B9
, then simple join
and select
methods should do the trick. Creating a lookup map would create complexity.
As you explained you have to dataframes df1
and df2
as
+---+---+---+
|A1 |A2 |A3 |
+---+---+---+
|a |b |c |
+---+---+---+
+---+---+---+---+---+---+---+---+---+
|B1 |B2 |B3 |B4 |B5 |B6 |B7 |B8 |B9 |
+---+---+---+---+---+---+---+---+---+
|1 |a |3 |4 |5 |b |7 |8 |c |
|1 |a |3 |4 |5 |b |7 |8 |e |
+---+---+---+---+---+---+---+---+---+
Simple join
and select
can be done
df1.join(df2, $"A1" === $"B2" && $"A2" === $"B6" && $"A3" === $"B9", "inner").select("B8")
which should give you
+---+
|B8 |
+---+
|8 |
+---+
I hope the answer is helpful
Updated
According to what I understood from your question and comments below, you are confused on how to pass array
to your lookup
udf
function. For that you can use array function. I have modified some parts of your almost perfect code to make it work
//mapping the three key with the value
val smallRdd = clsDataFrame.rdd
.map{row: Row => (mutable.WrappedArray.make[String](Array(row.getString(1), row.getString(5), row.getString(8))), row.getString(7))}
val lookupMap: collection.Map[mutable.WrappedArray[String], String] = smallRdd.collectAsMap()
//build the look up function
def lookup(lookupMap: collection.Map[mutable.WrappedArray[String],String]) =
udf((input: mutable.WrappedArray[String]) => lookupMap.lift(input))
//call the function
val combinedDF = mstrDataFrame.withColumn("ENTP_CLS_CD",lookup(lookupMap)(array($"SRC_SYS_CD",$"ORG_ID",$"ORG_CD")))
You should have
+----------+------+------+-----------+
|SRC_SYS_CD|ORG_ID|ORG_CD|ENTP_CLS_CD|
+----------+------+------+-----------+
|a |b |c |8 |
+----------+------+------+-----------+
Upvotes: 1