Neethu Lalitha
Neethu Lalitha

Reputation: 3071

Spark dataset filter elements

I have 2 spark datasets: lessonDS and latestLessonDS ;

This is my spark dataset POJO:

Lesson class:
    private List<SessionFilter> info;
    private lessonId;

LatestLesson class:

   private String id:

SessionFilter class:
   private String id;
   private String sessionName;

I want to get all Lesson data where info.id in Lesson class not in LatestLesson id .

something like this:

lessonDS.filter(explode(col("info.id")).notEqual(latestLessonDS.col("value"))).show();

latestLessonDS contain:

100A
200C
300A
400A


lessonDS contain:

1,[[100A,jon],[200C,natalie]]
2,[[100A,jon]]
3,[[600A,jon],[400A,Kim]]

result:
3,[[600A,jon]

Upvotes: 1

Views: 118

Answers (2)

werner
werner

Reputation: 14905

Usually the array_contains function could be used as join condition when joining lessonDs and latestLessonDs. But this function does not work here as the join condition requires that all elements of lessonDs.info.id appear in latestLessonDS.

A way to get the result is to explode lessonDs, join with latestLessonDs and then check if for all elements of lessonDs.info an entry in latestLessonDs exists by comparing the number of info elements before and after the join:

lessonDs
  .withColumn("noOfEntries", size('info))
  .withColumn("id", explode(col("info.id")))
  .join(latestLessonDs, "id" )
  .groupBy("lessonId", "info", "noOfEntries").count()
  .filter("noOfEntries = count")
  .drop("noOfEntries", "count")
  .show(false)

prints

+--------+------------------------------+
|lessonId|info                          |
+--------+------------------------------+
|1       |[[100A, jon], [200C, natalie]]|
|2       |[[100A, jon]]                 |
+--------+------------------------------+

Upvotes: 1

kavetiraviteja
kavetiraviteja

Reputation: 2208

If your dataset size latestLessonDS is reasonable enough you can collect it and broadcast and then simple filter transformtion on lessonDS will give you desired result.

like

import scala.collection.JavaConversions._
 import spark.implicits._ 
 val bc = spark.sparkContext.broadcast(latestLessonDS.collectAsList().toSeq)
    lessonDS.mapPartitions(itr => {
      val cache = bc.value;
      itr.filter(x => {
        //check in cache 
      })
    })

Upvotes: 2

Related Questions