Reputation: 894
I want to process Big Dataset using Spark and Scala as part of my analysis process.
Sample Input
id, related_ids
a, "b,e,f,i,j"
b, "e,i,j,k,l"
c, "f,i,j,m,n"
d, "c,i,g,m,s"
Sample Output
a, "c,d" b, "a,c" c, "a,b" d, "NULL"
I thought of creating a data frame on top of that doing operations but I'm not able to move further after creating the data frame.
{
val input11 = sc.textFile(inputFile).map(x=>x.replaceAll("\"",""))
val input22 = input11.map(x=>x.split(",")).collect {
case Array(id,name,name1, name2, name3, name4) => Record(id.toInt, name.toInt, name1.toInt, name2.toInt, name3.toInt, name4.toInt)
}
val tbl = input22.toDF()
tbl.registerTempTable("rawData")
val res = sqlContext.sql("select name from rawData")
}
case class Record(id: Int, name: Int, name1 : Int, name2 : Int, name3 : Int, name4 : Int)
}
Upvotes: 2
Views: 143
Reputation: 10406
You can get exactly what you want with the following code:
I import your data:
val d = sc.parallelize(Seq(1 -> Seq(2,5,6,10,11),
2 -> Seq(5,10,11,15,16),
3-> Seq(6,10,11,17,21),
4 -> Seq(3,10,12,17,22))
Then defines a function that will enumerate all the 2-tuples that can be created from an ordered list.
def expand(seq : Seq[Int]): Seq[(Int, Int)] =
if (seq.isEmpty)
Seq[(Int, Int)]()
else
seq.tail.map(x=> seq.head -> x) ++ expand(seq.tail)
example :
scala> expand(Seq(1,2,3,4))
res27: Seq[(Int, Int)] = List((1,2), (1,3), (1,4), (2,3), (2,4), (3,4))
And the final calculation would go as follows:
val p = 2
d
.flatMapValues(x=>x)
.map(_.swap)
.groupByKey
.map(_._2)
.flatMap(x=>expand(x.toSeq.sorted))
.map(_ -> 1)
.reduceByKey(_+_)
.filter(_._2>= p)
.map(_._1)
.flatMap(x=> Seq(x._1 -> x._2, x._2 -> x._1))
.groupByKey.mapValues(_.toArray.sorted)
which yields:
Array((1,Array(2, 3)), (2,Array(1, 3)), (3,Array(1, 2, 4)), (4,Array(3)))
Note by the way that you made a mistake in your exemple, 4 and 3 have 2 elements in common (10 and 17). With p=3, you get:
Array((1,Array(2, 3)), (2,Array(1)), (3,Array(1)))
To get even the lines who do not have any "co_relations", join with the original data.
d
.leftOuterJoin(connexions)
.mapValues(x=> x._1 -> x._2.getOrElse(null))
And you finally get (with p=3):
Array((1,(List(2, 5, 6, 10, 11),Array(2, 3))),
(2,(List(5, 10, 11, 15, 16),Array(1))),
(3,(List(6, 10, 11, 17, 21),Array(1))),
(4,(List(3, 10, 12, 17, 22),null)))
Nonetheless, if you want to study connexions between your data points in a more general way, I encourage you to have a look to the Graph API of spark. You might for instance be interested in computing the connected components of your graph. ( GraphX )
Upvotes: 1