torz
torz

Reputation: 41

How to get all different records in two different spark rdd

Very very new to spark and RDD's so I hope I explain what I'm after well enough for someone to understand and help :)

I have two very large sets of data, lets say 3 million rows with 50 columns which is stored in hadoop hdfs. What I would like to do is read both of these into RDD's so that it uses the parallelism & I would like to return a 3rd RDD that contains all records (from either RDD) that do not match.

Below hopefully helps show what I'm looking to do... Just trying to find all different records in the fastest most efficient way...

Data is not necessarily in the same order - row 1 of rdd1 may be row 4 of rdd2.

many thanks in advance!!

Example of datasets & desired result

So... This seems to be doing what I want it to, but it seems far to easy to be correct...

%spark

import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd.RDD
import sqlContext.implicits._
import org.apache.spark.sql._

//create the tab1 rdd.
val rdd1 = sqlContext.sql("select * FROM table1").withColumn("source",lit("tab1"))

//create the tab2 rdd.
val rdd2 = sqlContext.sql("select * FROM table2").withColumn("source",lit("tab2"))

//create the rdd of all misaligned records between table1 and the table2.
val rdd3 = rdd1.except(rdd2).unionAll(rdd2.except(rdd1))

//rdd3.printSchema()    
//val rdd3 = rdd1.except(rdd2)

//drop the temporary table that was used to create a hive compatible table from the last run.
sqlContext.dropTempTable("table3")

//register the new temporary table.
rdd3.toDF().registerTempTable("table3")

//drop the old compare table.
sqlContext.sql("drop table if exists data_base.compare_table")

//create the new version of the s_asset compare table.
sqlContext.sql("create table data_base.compare_table as select * from table3")

This is the final bit of code i've ended up on so far which seems to be doing the job - not sure on performance on the full dataset, will keep my fingers crossed...

many thanks to all that took the time to help this poor pleb out :)

p.s. if anyone has a solution with a little more performance I'd love to hear it! or if you can see some issue with this that may mean it will return the wrong results.

Upvotes: 0

Views: 423

Answers (3)

pasha701
pasha701

Reputation: 7207

Both can be joined with "full_outer", and then filter applied, where field value compared in both:

val filterCondition = cols
  .map(c => (col(s"l.$c") =!= col(s"r.$c") || col(s"l.$c").isNull || col(s"r.$c").isNull))
  .reduce((acc, c) => acc || c)

df1.alias("l")
  .join(df2.alias("r"), $"l.rowid" === $"r.rowid", "full_outer")
  .where(filterCondition)

Output:

+--------+--------+--------+-----------+------+--------+--------+--------+-----------+------+
|rowid   |name    |status  |lastupdated|source|rowid   |name    |status  |lastupdated|source|
+--------+--------+--------+-----------+------+--------+--------+--------+-----------+------+
|1-za23f1|product2|inactive|31-12-2019 |rdd1  |1-za23f1|product2|active  |31-12-2019 |rdd2  |
|1-za23f2|product3|inactive|01-01-2020 |rdd1  |1-za23f2|product3|active  |01-01-2020 |rdd2  |
|1-za23f3|product4|inactive|02-01-2020 |rdd1  |1-za23f3|product1|inactive|02-01-2020 |rdd2  |
+--------+--------+--------+-----------+------+--------+--------+--------+-----------+------+

Upvotes: 0

RainaMegha
RainaMegha

Reputation: 136

you can rather read the data into dataframes and not into Rdds and then use union and group by to achieve the result

Upvotes: 0

QuickSilver
QuickSilver

Reputation: 4045

  1. Load your both Dataframes as df1,df2
  2. Add a source column with default value as rdd1 and rdd2 respectively
  3. Union df1 and df2
  4. Group by "rowid", "name", "status", "lastupdated" and collect its sources as set
  5. Filter all rows which has single source
import org.apache.spark.sql.functions._

object OuterJoin {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    val cols = Array("rowid", "name", "status", "lastupdated")

    val df1 = List(
      ("1-za23f0", "product1", "active", "30-12-2019"),
      ("1-za23f1", "product2", "inactive", "31-12-2019"),
      ("1-za23f2", "product3", "inactive", "01-01-2020"),
      ("1-za23f3", "product4", "inactive", "02-01-2020"),
      ("1-za23f4", "product5", "inactive", "03-01-2020"))
      .toDF(cols:_ *)
      .withColumn("source",lit("rdd1"))

    val df2 = List(
      ("1-za23f0", "product1", "active", "30-12-2019"),
      ("1-za23f1", "product2", "active", "31-12-2019"),
      ("1-za23f2", "product3", "active", "01-01-2020"),
      ("1-za23f3", "product1", "inactive", "02-01-2020"),
      ("1-za23f4", "product5", "inactive", "03-01-2020"))
      .toDF(cols:_ *)
        .withColumn("source",lit("rdd2"))

    df1.union(df2)
      .groupBy(cols.map(col):_ *)
      .agg(collect_set("source").as("sources"))
      .filter(size(col("sources")) === 1)
      .withColumn("from_rdd", explode(col("sources") ))
      .drop("sources")
      .show()
  }

}

Upvotes: 2

Related Questions