user4046073
user4046073

Reputation: 871

scala- dataset- how to use a function on two case classes?

I have two datasets and create two case classes (The reason why I don't join ds together is because I want to return the first matching record in ds B with key in ds A)

Below is my attempt so far. How can I map that function to two case classes? Many thanks! I'm quite new here.

case class a_class (idA: Int, numA: Int)
case class b_class(idB: Int, numB:Int )
def findNum(a:a_Class, b:b_class): Int = {
     if (a.idA =!=b.idB){
            break
       }else{
       return  b.numB
       }
 }
aTb.createOrReplaceTempView("tableA") 
bTb.createOrReplaceTempView("tableB")
var aDS = sqlContext.table("tableA").as[a_class]
var bDS = sqlContext.table("pview").as[b_class]
 //a_class.map(, => )).show //how do I use findNum function here? 

Example input:

 +------+---+
 |idA  |numA|
 +------+---+
 |    a |100|
 |    b |200|
 +------+---+

 +------+---+
 |idB   |numB|
 +------+---+
 |a     |500|
 |a     |600|
 +------+---+

So the expected output is 500, as the first row is the first matching record in table B

Upvotes: 1

Views: 274

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41977

your solution lies in join, groupBy and aggregation

First your case class and the sample input data are mismatching as a and b cannot be int type. So case classes should be

case class a_class (idA: String, numA: Int)
case class b_class(idB: String, numB:Int )

Using these case classes you can create dataSets. For testing purposes I am creating as following

import sqlContext.implicits._
import org.apache.spark.sql.functions._

val tableA = Seq(
  a_class("a", 100),
  a_class("b", 200)
).toDS

val tableB = Seq(
  b_class("a", 500),
  b_class("a", 600)
).toDS

Then the final dataset can be achieved using following method.

tableA.join(tableB, $"idA" === $"idB", "inner") // inner join of two datasets
  .drop("idA", "numA")   //droping columns of tableB
  .groupBy("idB")        //grouping data to get the first of each group
  .agg(first("numB").as("numB"))    //taking the first of each group
  .show(false)

which should give you

+---+----+
|idB|numB|
+---+----+
|a  |600 |
+---+----+

Updated

Above result doesn't match with your desired output, this is because join reorders the tableB.

you can get your desired output by just doing

tableB.groupBy("idB")
  .agg(first("numB").as("numB"))
  .show(false)

the result will be first row of each id of tableB

+---+----+
|idB|numB|
+---+----+
|a  |500 |
+---+----+

If you want only the first rows with id that matches with tableA then you join with tableA as above and if you don't want data of tableA then you drop them as

val tempTableB = tableB.groupBy("idB")
  .agg(first("numB").as("numB"))

tableA.join(tempTableB, $"idA" === $"idB", "inner")
  .drop("idA", "numA")
  .show(false)

the output is first row of tableB that matches with id of tableA

+---+----+
|idB|numB|
+---+----+
|a  |500 |
+---+----+

Upvotes: 2

Related Questions