Bigo
Bigo

Reputation: 113

compare 2 spark RDD to make sure that value from first is in the range of the second RDD

there are 2 very large RDD(each has more than milion records), the first is :

rdd1.txt(name,value):    
chr1    10016 
chr1    10017 
chr1    10018 
chr1    20026 
chr1    20036 
chr1    25016 
chr1    26026
chr2    40016 
chr2    40116 
chr2    50016 
chr3    70016 

rdd2.txt(name,min,max):
chr1     10000  20000
chr1     20000  30000
chr2     40000  50000
chr2     50000  60000
chr3     70000  80000
chr3    810001  910000
chr3    860001  960000
chr3    910001  1010000

the value is valid only when it's in the range between the Min and Max of the second RDD , the count of the name's occurs will plus 1 if its valid

Take the above as an example, the chr1's occurs 7.

how can i get the result in scala with spark?

many thanks

Upvotes: 3

Views: 454

Answers (2)

Ramachandran.A.G
Ramachandran.A.G

Reputation: 4948

As I understand , you want values from rdd1 that fall between min and max in rdd2. Please see if the below works

val rdd1 = sc.parallelize(Seq(("chr1", 10016 ), ("chr1", 10017), ("chr1", 10018)))
val rdd2 = sc.parallelize(Seq(("chr1", 10000, 20000), ("chr1",20000, 30000)))
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()


scala> val rdd1=sc.parallelize(Seq(("chr1",    10016 ),("chr1",    10017 ),("chr1",    10018 ),("chr1",    20026 ),("chr1",    20036 ),("chr1",    25016 ),("chr1",    26026),("chr2",    40016 ),("chr2",    40116 ),("chr2",    50016 ),("chr3",    70016 )))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24

scala> val rdd2=sc.parallelize(Seq(("chr1",     10000,  20000),("chr1",     20000 , 30000),("chr2",     40000  ,50000),("chr2",     50000  ,60000),("chr3",     70000  ,80000),("chr3",    810001  ,910000),("chr3",    860001  ,960000),("chr3",    910001  ,1010000)))
rdd2: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24


scala> rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()
+----+-----+
|name|count|
+----+-----+
|chr3|    1|
|chr1|    7|
|chr2|    3|
+----+-----+

Edits If you are reading from a file , i would use the following

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};

val sqlContext = new SQLContext(sc)
val nameValueSchema = StructType(Array(StructField("name", StringType, true),StructField("value", IntegerType, true)))
val nameMinMaxSchema = StructType(Array(StructField("name", StringType, true),StructField("min", IntegerType, true),StructField("max", IntegerType, true)))
val rdd1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameValueSchema).load("rdd1.csv")
val rdd2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameMinMaxSchema).load("rdd2.csv")
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()

This will run on all nodes and there is no need of parallelize call. Quoting the documentation here

def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T] Permalink Distribute a local Scala collection to form an RDD.

Upvotes: 0

user6022341
user6022341

Reputation:

Try:

val rdd1 = sc.parallelize(Seq(
  ("chr1", 10016 ), ("chr1", 10017), ("chr1", 10018)))
val rdd2 = sc.parallelize(Seq(
   ("chr1", 10000, 20000), ("chr1",20000, 30000)))

rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name"))
 .where($"value".between($"min", $"max"))

Upvotes: 2

Related Questions