Junaid
Junaid

Reputation: 181

Spark: Merging 2 columns of a DataSet into a single column

I have a table with ids as 2 different columns. I have another table which contains objects associated with the ids. I would like filter out the ids from the table2 for which id exists in either id1 or id2 in Table 1.

Table 1:

| id1  | id2 |
|  1   |  1  |
|  1   |  1  |
|  1   |  3  |
|  2   |  5  |
|  3   |  1  | 
|  3   |  2  |
|  3   |  3  |

Table 2:

| id  | obj   |
|  1  |  'A'  |
|  2  |  'B'  |
|  3  |  'C'  |
|  4  |  'D'  | 
|  5  |  'E'  |  
|  6  |  'F'  |
|  7  |  'G'  |

What I am thinking is to create out a list from table1 containing the unique ids which will be [1, 2, 3, 5] from above example.

Then filter out the data frame on the basis of the list, which will give the result.

| id  | obj   |
|  1  |  'A'  |
|  2  |  'B'  |
|  3  |  'C'  |
|  5  |  'E'  |  

Though I have concerns regarding the scalability of the solution. The list can be large and it may even fail to load into memory for some cases. Any recommendations here in this case?

Thanks.

Upvotes: 3

Views: 70

Answers (3)

Chema
Chema

Reputation: 2828

The following approach would work

      import spark.implicits._
      val t1 = Seq((1,1),(1,1),(1,3),(2,5),(3,1),(3,2),(3,3))
      val t2 = Seq((1,"A"),(2,"B"),(3,"C"),(4,"D"),(5,"E"),(6,"F"),(7,"G"))
      val tt1 = sc.parallelize(t1).toDF("id1","id2")
                  .persist(StorageLevel.MEMORY_AND_DISK)
      val tt2 = sc.parallelize(t2).toDF("id", "obj")
                  .persist(StorageLevel.MEMORY_AND_DISK)

      tt1.show()
      tt2.show()

      tt1.createOrReplaceTempView("table1")
      tt2.createOrReplaceTempView("table2")

     val output = sqlContext.sql(
        """
          |SELECT DISTINCT id, obj
          |FROM table1 t1
          |JOIN table2 t2 ON(t1.id1 = t2.id) OR (t1.id2 = id)
          |ORDER BY id
          |""".stripMargin).persist(StorageLevel.MEMORY_AND_DISK)

      output.show()

output

+---+---+
| id|obj|
+---+---+
|  1|  A|
|  2|  B|
|  3|  C|
|  5|  E|
+---+---+

For memory issues you can persist the data to memory and disk, however there are more options, you can choose the best option that fit with your particular problem, you can follow this link: RDD Persistence

I would consider too the number of partitions by configuring:

spark.sql.shuffle.partitions
/*
Configures the number of partitions to use when shuffling data for joins or aggregations.
*/

  val spark = SparkSession
    .builder()
    .appName("MySparkProcess")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","400") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","MySparkProcess") // To silence Metrics warning
    .getOrCreate()

I would take a look to this link too for further configuration:

Performance Tuning

Upvotes: 1

chlebek
chlebek

Reputation: 2451

Another approach:

val id_table = table1.select(explode(array('*)).as("id")).distinct()
val result = table2.join(id_table,"id")
result.show()

output:

+---+---+
| id|obj|
+---+---+
|  1|'A'|
|  2|'B'|
|  3|'C'|
|  5|'E'|
+---+---+

Upvotes: 1

Terry Dactyl
Terry Dactyl

Reputation: 1868

Use spark SQL - Note - joins in spark come with a whole set of performance considerations including DF size, key distribution etc. so please familiarise yourself.

Generally though:

table2.as("t2")
  .join(
    table1.as("t1"),
    $"t2.id" === $"t1.id1" || $"t2.id" === $"t1.id2",
    "left"
  )
  .where($"t1.id1".isNull)
  .select("t2.*")

Upvotes: 1

Related Questions