Reputation: 181
I have a table with ids as 2 different columns. I have another table which contains objects associated with the ids. I would like filter out the ids from the table2 for which id exists in either id1 or id2 in Table 1.
Table 1:
| id1 | id2 |
| 1 | 1 |
| 1 | 1 |
| 1 | 3 |
| 2 | 5 |
| 3 | 1 |
| 3 | 2 |
| 3 | 3 |
Table 2:
| id | obj |
| 1 | 'A' |
| 2 | 'B' |
| 3 | 'C' |
| 4 | 'D' |
| 5 | 'E' |
| 6 | 'F' |
| 7 | 'G' |
What I am thinking is to create out a list from table1 containing the unique ids which will be [1, 2, 3, 5] from above example.
Then filter out the data frame on the basis of the list, which will give the result.
| id | obj |
| 1 | 'A' |
| 2 | 'B' |
| 3 | 'C' |
| 5 | 'E' |
Though I have concerns regarding the scalability of the solution. The list can be large and it may even fail to load into memory for some cases. Any recommendations here in this case?
Thanks.
Upvotes: 3
Views: 70
Reputation: 2828
The following approach would work
import spark.implicits._
val t1 = Seq((1,1),(1,1),(1,3),(2,5),(3,1),(3,2),(3,3))
val t2 = Seq((1,"A"),(2,"B"),(3,"C"),(4,"D"),(5,"E"),(6,"F"),(7,"G"))
val tt1 = sc.parallelize(t1).toDF("id1","id2")
.persist(StorageLevel.MEMORY_AND_DISK)
val tt2 = sc.parallelize(t2).toDF("id", "obj")
.persist(StorageLevel.MEMORY_AND_DISK)
tt1.show()
tt2.show()
tt1.createOrReplaceTempView("table1")
tt2.createOrReplaceTempView("table2")
val output = sqlContext.sql(
"""
|SELECT DISTINCT id, obj
|FROM table1 t1
|JOIN table2 t2 ON(t1.id1 = t2.id) OR (t1.id2 = id)
|ORDER BY id
|""".stripMargin).persist(StorageLevel.MEMORY_AND_DISK)
output.show()
output
+---+---+
| id|obj|
+---+---+
| 1| A|
| 2| B|
| 3| C|
| 5| E|
+---+---+
For memory issues you can persist the data to memory and disk, however there are more options, you can choose the best option that fit with your particular problem, you can follow this link: RDD Persistence
I would consider too the number of partitions by configuring:
spark.sql.shuffle.partitions
/*
Configures the number of partitions to use when shuffling data for joins or aggregations.
*/
val spark = SparkSession
.builder()
.appName("MySparkProcess")
.master("local[*]")
.config("spark.sql.shuffle.partitions","400") //Change to a more reasonable default number of partitions for our data
.config("spark.app.id","MySparkProcess") // To silence Metrics warning
.getOrCreate()
I would take a look to this link too for further configuration:
Upvotes: 1
Reputation: 2451
Another approach:
val id_table = table1.select(explode(array('*)).as("id")).distinct()
val result = table2.join(id_table,"id")
result.show()
output:
+---+---+
| id|obj|
+---+---+
| 1|'A'|
| 2|'B'|
| 3|'C'|
| 5|'E'|
+---+---+
Upvotes: 1
Reputation: 1868
Use spark SQL - Note - joins in spark come with a whole set of performance considerations including DF size, key distribution etc. so please familiarise yourself.
Generally though:
table2.as("t2")
.join(
table1.as("t1"),
$"t2.id" === $"t1.id1" || $"t2.id" === $"t1.id2",
"left"
)
.where($"t1.id1".isNull)
.select("t2.*")
Upvotes: 1