Reputation: 41
Very very new to spark and RDD's so I hope I explain what I'm after well enough for someone to understand and help :)
I have two very large sets of data, lets say 3 million rows with 50 columns which is stored in hadoop hdfs. What I would like to do is read both of these into RDD's so that it uses the parallelism & I would like to return a 3rd RDD that contains all records (from either RDD) that do not match.
Below hopefully helps show what I'm looking to do... Just trying to find all different records in the fastest most efficient way...
Data is not necessarily in the same order - row 1 of rdd1 may be row 4 of rdd2.
many thanks in advance!!
So... This seems to be doing what I want it to, but it seems far to easy to be correct...
%spark
import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd.RDD
import sqlContext.implicits._
import org.apache.spark.sql._
//create the tab1 rdd.
val rdd1 = sqlContext.sql("select * FROM table1").withColumn("source",lit("tab1"))
//create the tab2 rdd.
val rdd2 = sqlContext.sql("select * FROM table2").withColumn("source",lit("tab2"))
//create the rdd of all misaligned records between table1 and the table2.
val rdd3 = rdd1.except(rdd2).unionAll(rdd2.except(rdd1))
//rdd3.printSchema()
//val rdd3 = rdd1.except(rdd2)
//drop the temporary table that was used to create a hive compatible table from the last run.
sqlContext.dropTempTable("table3")
//register the new temporary table.
rdd3.toDF().registerTempTable("table3")
//drop the old compare table.
sqlContext.sql("drop table if exists data_base.compare_table")
//create the new version of the s_asset compare table.
sqlContext.sql("create table data_base.compare_table as select * from table3")
This is the final bit of code i've ended up on so far which seems to be doing the job - not sure on performance on the full dataset, will keep my fingers crossed...
many thanks to all that took the time to help this poor pleb out :)
p.s. if anyone has a solution with a little more performance I'd love to hear it! or if you can see some issue with this that may mean it will return the wrong results.
Upvotes: 0
Views: 423
Reputation: 7207
Both can be joined with "full_outer", and then filter applied, where field value compared in both:
val filterCondition = cols
.map(c => (col(s"l.$c") =!= col(s"r.$c") || col(s"l.$c").isNull || col(s"r.$c").isNull))
.reduce((acc, c) => acc || c)
df1.alias("l")
.join(df2.alias("r"), $"l.rowid" === $"r.rowid", "full_outer")
.where(filterCondition)
Output:
+--------+--------+--------+-----------+------+--------+--------+--------+-----------+------+
|rowid |name |status |lastupdated|source|rowid |name |status |lastupdated|source|
+--------+--------+--------+-----------+------+--------+--------+--------+-----------+------+
|1-za23f1|product2|inactive|31-12-2019 |rdd1 |1-za23f1|product2|active |31-12-2019 |rdd2 |
|1-za23f2|product3|inactive|01-01-2020 |rdd1 |1-za23f2|product3|active |01-01-2020 |rdd2 |
|1-za23f3|product4|inactive|02-01-2020 |rdd1 |1-za23f3|product1|inactive|02-01-2020 |rdd2 |
+--------+--------+--------+-----------+------+--------+--------+--------+-----------+------+
Upvotes: 0
Reputation: 136
you can rather read the data into dataframes and not into Rdds and then use union and group by to achieve the result
Upvotes: 0
Reputation: 4045
df1
,df2
source
column with default value as rdd1
and rdd2
respectivelydf1
and df2
"rowid", "name", "status", "lastupdated"
and collect its sources as setimport org.apache.spark.sql.functions._
object OuterJoin {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
import spark.implicits._
val cols = Array("rowid", "name", "status", "lastupdated")
val df1 = List(
("1-za23f0", "product1", "active", "30-12-2019"),
("1-za23f1", "product2", "inactive", "31-12-2019"),
("1-za23f2", "product3", "inactive", "01-01-2020"),
("1-za23f3", "product4", "inactive", "02-01-2020"),
("1-za23f4", "product5", "inactive", "03-01-2020"))
.toDF(cols:_ *)
.withColumn("source",lit("rdd1"))
val df2 = List(
("1-za23f0", "product1", "active", "30-12-2019"),
("1-za23f1", "product2", "active", "31-12-2019"),
("1-za23f2", "product3", "active", "01-01-2020"),
("1-za23f3", "product1", "inactive", "02-01-2020"),
("1-za23f4", "product5", "inactive", "03-01-2020"))
.toDF(cols:_ *)
.withColumn("source",lit("rdd2"))
df1.union(df2)
.groupBy(cols.map(col):_ *)
.agg(collect_set("source").as("sources"))
.filter(size(col("sources")) === 1)
.withColumn("from_rdd", explode(col("sources") ))
.drop("sources")
.show()
}
}
Upvotes: 2