Reputation: 217
I am relatively new to Spark and I am trying to filter out invalid records from a Spark Dataset. My dataset looks something like this:
| Id | Curr| Col3 |
| 1 | USD | 1111 |
| 2 | CNY | 2222 |
| 3 | USD | 3333 |
| 1 | CNY | 4444 |
In my logic, each Id has a vaild currency. So it will basically be a map of id->currency
val map = Map(1 -> "USD", 2 -> "CNY")
I want to filter out the rows from the dataset that have Id not corresponding to the valid currency code. So after my filter operation, the dataset should look something like this:
| Id | Curr| Col3 |
| 1 | USD | 1111 |
| 2 | CNY | 2222 |
The limitation I have here is that I cannot use a UDF. Can somebody help me in coming up with a filter operation for this?
Upvotes: 1
Views: 1748
Reputation: 262
val a = List((1,"USD",1111),(2,"CAN",2222),(3,"USD",4444),(1,"CAN",5555))
val b = Map(1 -> "USD",2 -> "CAN")
a.filter(x => b.keys.exists(_ == x._1)).filter(y => y._2 == b(y._1))
Upvotes: -2
Reputation: 214957
You can create a data frame out of the map
and then do an inner join with the original data frame to filter it:
val map_df = map.toSeq.toDF("Id", "Curr")
// map_df: org.apache.spark.sql.DataFrame = [Id: int, Curr: string]
df.join(map_df, Seq("Id", "Curr")).show
+---+----+----+
| Id|Curr|Col3|
+---+----+----+
| 1| USD|1111|
| 2| CNY|2222|
+---+----+----+
Upvotes: 3