Reputation: 3071
I have 2 spark datasets: lessonDS and latestLessonDS ;
This is my spark dataset POJO:
Lesson class:
private List<SessionFilter> info;
private lessonId;
LatestLesson class:
private String id:
SessionFilter class:
private String id;
private String sessionName;
I want to get all Lesson data where info.id in Lesson class not in LatestLesson id .
something like this:
lessonDS.filter(explode(col("info.id")).notEqual(latestLessonDS.col("value"))).show();
latestLessonDS contain:
100A
200C
300A
400A
lessonDS contain:
1,[[100A,jon],[200C,natalie]]
2,[[100A,jon]]
3,[[600A,jon],[400A,Kim]]
result:
3,[[600A,jon]
Upvotes: 1
Views: 118
Reputation: 14905
Usually the array_contains function could be used as join condition when joining lessonDs
and latestLessonDs
. But this function does not work here as the join condition requires that all elements of lessonDs.info.id
appear in latestLessonDS
.
A way to get the result is to explode lessonDs
, join with latestLessonDs
and then check if for all elements of lessonDs.info
an entry in latestLessonDs
exists by comparing the number of info elements before and after the join:
lessonDs
.withColumn("noOfEntries", size('info))
.withColumn("id", explode(col("info.id")))
.join(latestLessonDs, "id" )
.groupBy("lessonId", "info", "noOfEntries").count()
.filter("noOfEntries = count")
.drop("noOfEntries", "count")
.show(false)
prints
+--------+------------------------------+
|lessonId|info |
+--------+------------------------------+
|1 |[[100A, jon], [200C, natalie]]|
|2 |[[100A, jon]] |
+--------+------------------------------+
Upvotes: 1
Reputation: 2208
If your dataset size latestLessonDS is reasonable enough you can collect it and broadcast and then simple filter transformtion on lessonDS will give you desired result.
like
import scala.collection.JavaConversions._
import spark.implicits._
val bc = spark.sparkContext.broadcast(latestLessonDS.collectAsList().toSeq)
lessonDS.mapPartitions(itr => {
val cache = bc.value;
itr.filter(x => {
//check in cache
})
})
Upvotes: 2