amelie
amelie

Reputation: 25

How can i remove duplicate tuples with scala? Cartesian Scala Spark

actually I have an RDD containing some protein names and their domains. I used the cartesian function to determine the possible pairs of it. As a result i obtained unfortunately duplicated pairs. How can i keep only one tuple and remove the duplicated one? Here's an example:

+------------------------------------+------------------------------------+
|             Protein1               |                Protein2            |
+------------------------------------+------------------------------------+
|(P0C2L1,IPR0179)                    |(P0CW05,IPR004372;IPR000890)        |
|(P0CW05,IPR004372;IPR000890)        |(P0C2L1,IPR0179)                    |
|(B2UDV1,IPR0104)                    |(Q4R8P0,IPR029058;IPR000073;IPR0266)| 
|(Q4R8P0,IPR029058;IPR000073;IPR0266)|(B2UDV1,IPR0104)                    |
+------------------------------------+------------------------------------+

I would like to have:

+------------------------------------+------------------------------------+
|             Protein1               |                Protein2            |
+------------------------------------+------------------------------------+
|(P0C2L1,IPR0179)                    |(P0CW05,IPR004372;IPR000890)        |
|(B2UDV1,IPR0104)                    |(Q4R8P0,IPR029058;IPR000073;IPR0266)| 
+------------------------------------+------------------------------------+

Upvotes: 2

Views: 242

Answers (2)

Manish
Manish

Reputation: 1157

I have assumed input data based on information provided and implemented below solution.

It does :

  1. Converts RDD to spark dataframe.
  2. Swaps every second input based on length(Protein1) > length(Protein2).
  3. And uses the dropDuplicates method to remove duplicates.
  4. Stores in dataframe and then in RDD.

Note: For this to work "length(Protein1) > length(Protein2)" has to meet. If more input data clarity is provided by OP, will work on more solutions.

//Creating the paired RDD as provided by OP
var x: RDD[(String, String)] = sc.parallelize(Seq((("P0C2L1,IPR0179"), ("P0CW05,IPR004372;IPR000890")), (("P0CW05,IPR004372;IPR000890"),("P0C2L1,IPR0179") ), (("B2UDV1,IPR0104"),("Q4R8P0,IPR029058;IPR000073;IPR0266")), (("Q4R8P0,IPR029058;IPR000073;IPR0266"),("B2UDV1,IPR0104"))   ))

//Creating as spark dataframe out of this RDD
var combDF = spark.createDataFrame(x).toDF("Protein1","Protein2")
combDF.show(20,false)

//+------------------------------------+------------------------------------+
//|Protein1                            |Protein2                            |
//+------------------------------------+------------------------------------+
//|(P0C2L1,IPR0179)                    |(P0CW05,IPR004372;IPR000890)        |
//|(P0CW05,IPR004372;IPR000890)        |(P0C2L1,IPR0179)                    |
//|(B2UDV1,IPR0104)                    |(Q4R8P0,IPR029058;IPR000073;IPR0266)|
//|(Q4R8P0,IPR029058;IPR000073;IPR0266)|(B2UDV1,IPR0104)                    |
//+------------------------------------+------------------------------------+

// creating temporary views
combDF.createOrReplaceTempView("combDF")

// Below statement is only required for this example just to cast to struct
combDF = spark.sql("""select named_struct("col1", element_at(split(Protein1,","),1), "col2",  element_at(split(Protein1,","),2)) as Protein1,
                       named_struct("col1", element_at(split(Protein2,","),1), "col2",  element_at(split(Protein2,","),2)) as Protein2
                 from combDF""")
//end 

combDF.createOrReplaceTempView("combDF")
combDF.show()
var result = spark.sql("""
  |select case when length(Protein1_m) > length(Protein2_m) then element_at(protein_array, 2) 
  |            else element_at(protein_array, 1) 
  |            end as Protein1,
  |       case when length(Protein1_m) > length(Protein2_m) then element_at(protein_array, 1) 
  |            else element_at(protein_array, 2) 
  |            end as Protein2
  |from 
  |(select Protein1, Protein2, cast(Protein1 as string) as Protein1_m, cast(Protein2 as string) as Protein2_m,
  |        array(Protein1,Protein2) as protein_array
  |from combDF) a
""".stripMargin).dropDuplicates()
// Result in spark dataframe
result.show(20,false)

//+-----------------+-------------------------------------+
//|Protein1         |Protein2                             |
//+-----------------+-------------------------------------+
//|(B2UDV1,IPR0104) |(Q4R8P0,IPR029058;IPR000073;IPR0266) |
//|(P0C2L1,IPR0179) |(P0CW05,IPR004372;IPR000890)         |
//+-------------------+-----------------------------------+

// result in RDD
var resultRDD = result.rdd
resultRDD.collect().foreach(println)

//[(B2UDV1,IPR0104),(Q4R8P0,IPR029058;IPR000073;IPR0266)]   
//[(P0C2L1,IPR0179),(P0CW05,IPR004372;IPR000890)]

Upvotes: 4

Chema
Chema

Reputation: 2828

One approach would be to transform that rdd to a dataframe and using functions to remove duplicates.

Duplicate rows could be remove or drop from Spark DataFrame using distinct() and dropDuplicates() functions, distinct() can be used to remove rows that have the same values on all columns whereas dropDuplicates() can be used to remove rows that have the same values on multiple selected columns.

Upvotes: 1

Related Questions