Reputation: 871
I have two datasets and create two case classes (The reason why I don't join ds together is because I want to return the first matching record in ds B with key in ds A)
Below is my attempt so far. How can I map that function to two case classes? Many thanks! I'm quite new here.
case class a_class (idA: Int, numA: Int)
case class b_class(idB: Int, numB:Int )
def findNum(a:a_Class, b:b_class): Int = {
if (a.idA =!=b.idB){
break
}else{
return b.numB
}
}
aTb.createOrReplaceTempView("tableA")
bTb.createOrReplaceTempView("tableB")
var aDS = sqlContext.table("tableA").as[a_class]
var bDS = sqlContext.table("pview").as[b_class]
//a_class.map(, => )).show //how do I use findNum function here?
Example input:
+------+---+
|idA |numA|
+------+---+
| a |100|
| b |200|
+------+---+
+------+---+
|idB |numB|
+------+---+
|a |500|
|a |600|
+------+---+
So the expected output is 500, as the first row is the first matching record in table B
Upvotes: 1
Views: 274
Reputation: 41977
your solution lies in join
, groupBy
and aggregation
First your case class
and the sample input data are mismatching as a
and b
cannot be int
type. So case classes
should be
case class a_class (idA: String, numA: Int)
case class b_class(idB: String, numB:Int )
Using these case classes
you can create dataSets
. For testing purposes I am creating as following
import sqlContext.implicits._
import org.apache.spark.sql.functions._
val tableA = Seq(
a_class("a", 100),
a_class("b", 200)
).toDS
val tableB = Seq(
b_class("a", 500),
b_class("a", 600)
).toDS
Then the final dataset
can be achieved using following method.
tableA.join(tableB, $"idA" === $"idB", "inner") // inner join of two datasets
.drop("idA", "numA") //droping columns of tableB
.groupBy("idB") //grouping data to get the first of each group
.agg(first("numB").as("numB")) //taking the first of each group
.show(false)
which should give you
+---+----+
|idB|numB|
+---+----+
|a |600 |
+---+----+
Updated
Above result doesn't match with your desired output, this is because join
reorders the tableB
.
you can get your desired output by just doing
tableB.groupBy("idB")
.agg(first("numB").as("numB"))
.show(false)
the result will be first row
of each id
of tableB
+---+----+
|idB|numB|
+---+----+
|a |500 |
+---+----+
If you want only the first rows
with id
that matches with tableA
then you join
with tableA
as above and if you don't want data of tableA
then you drop
them as
val tempTableB = tableB.groupBy("idB")
.agg(first("numB").as("numB"))
tableA.join(tempTableB, $"idA" === $"idB", "inner")
.drop("idA", "numA")
.show(false)
the output is first row
of tableB
that matches with id
of tableA
+---+----+
|idB|numB|
+---+----+
|a |500 |
+---+----+
Upvotes: 2