Reputation: 113
there are 2 very large RDD(each has more than milion records), the first is :
rdd1.txt(name,value):
chr1 10016
chr1 10017
chr1 10018
chr1 20026
chr1 20036
chr1 25016
chr1 26026
chr2 40016
chr2 40116
chr2 50016
chr3 70016
rdd2.txt(name,min,max):
chr1 10000 20000
chr1 20000 30000
chr2 40000 50000
chr2 50000 60000
chr3 70000 80000
chr3 810001 910000
chr3 860001 960000
chr3 910001 1010000
the value is valid only when it's in the range between the Min and Max of the second RDD , the count of the name's occurs will plus 1 if its valid
Take the above as an example, the chr1's occurs 7.
how can i get the result in scala with spark?
many thanks
Upvotes: 3
Views: 454
Reputation: 4948
As I understand , you want values from rdd1 that fall between min and max in rdd2. Please see if the below works
val rdd1 = sc.parallelize(Seq(("chr1", 10016 ), ("chr1", 10017), ("chr1", 10018)))
val rdd2 = sc.parallelize(Seq(("chr1", 10000, 20000), ("chr1",20000, 30000)))
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()
scala> val rdd1=sc.parallelize(Seq(("chr1", 10016 ),("chr1", 10017 ),("chr1", 10018 ),("chr1", 20026 ),("chr1", 20036 ),("chr1", 25016 ),("chr1", 26026),("chr2", 40016 ),("chr2", 40116 ),("chr2", 50016 ),("chr3", 70016 )))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> val rdd2=sc.parallelize(Seq(("chr1", 10000, 20000),("chr1", 20000 , 30000),("chr2", 40000 ,50000),("chr2", 50000 ,60000),("chr3", 70000 ,80000),("chr3", 810001 ,910000),("chr3", 860001 ,960000),("chr3", 910001 ,1010000)))
rdd2: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24
scala> rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()
+----+-----+
|name|count|
+----+-----+
|chr3| 1|
|chr1| 7|
|chr2| 3|
+----+-----+
Edits If you are reading from a file , i would use the following
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
val sqlContext = new SQLContext(sc)
val nameValueSchema = StructType(Array(StructField("name", StringType, true),StructField("value", IntegerType, true)))
val nameMinMaxSchema = StructType(Array(StructField("name", StringType, true),StructField("min", IntegerType, true),StructField("max", IntegerType, true)))
val rdd1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameValueSchema).load("rdd1.csv")
val rdd2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameMinMaxSchema).load("rdd2.csv")
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()
This will run on all nodes and there is no need of parallelize call. Quoting the documentation here
def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T] Permalink Distribute a local Scala collection to form an RDD.
Upvotes: 0
Reputation:
Try:
val rdd1 = sc.parallelize(Seq(
("chr1", 10016 ), ("chr1", 10017), ("chr1", 10018)))
val rdd2 = sc.parallelize(Seq(
("chr1", 10000, 20000), ("chr1",20000, 30000)))
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name"))
.where($"value".between($"min", $"max"))
Upvotes: 2