data_addict
data_addict

Reputation: 894

Spark data processing

I want to process Big Dataset using Spark and Scala as part of my analysis process.

Sample Input

id, related_ids
a, "b,e,f,i,j"
b, "e,i,j,k,l"
c, "f,i,j,m,n"
d, "c,i,g,m,s" 

Sample Output

a, "c,d" b, "a,c" c, "a,b" d, "NULL"

I thought of creating a data frame on top of that doing operations but I'm not able to move further after creating the data frame.

{
    val input11 = sc.textFile(inputFile).map(x=>x.replaceAll("\"",""))
    val input22 = input11.map(x=>x.split(",")).collect {
    case Array(id,name,name1, name2, name3, name4) => Record(id.toInt, name.toInt, name1.toInt, name2.toInt, name3.toInt, name4.toInt)
    }

            val tbl = input22.toDF()
            tbl.registerTempTable("rawData")
            val res = sqlContext.sql("select name from rawData")
    }
    case class Record(id: Int, name: Int, name1 : Int, name2 : Int, name3 :  Int, name4 : Int)
}

Upvotes: 2

Views: 143

Answers (1)

Oli
Oli

Reputation: 10406

You can get exactly what you want with the following code:

I import your data:

val d = sc.parallelize(Seq(1 -> Seq(2,5,6,10,11), 
                           2 -> Seq(5,10,11,15,16), 
                           3-> Seq(6,10,11,17,21), 
                           4 -> Seq(3,10,12,17,22))

Then defines a function that will enumerate all the 2-tuples that can be created from an ordered list.

def expand(seq : Seq[Int]): Seq[(Int, Int)] = 
    if (seq.isEmpty) 
         Seq[(Int, Int)]() 
    else 
         seq.tail.map(x=> seq.head -> x) ++ expand(seq.tail)

example :

scala> expand(Seq(1,2,3,4))
res27: Seq[(Int, Int)] = List((1,2), (1,3), (1,4), (2,3), (2,4), (3,4))

And the final calculation would go as follows:

val p = 2
d
     .flatMapValues(x=>x)
     .map(_.swap)
     .groupByKey
     .map(_._2)
     .flatMap(x=>expand(x.toSeq.sorted))
     .map(_ -> 1)
     .reduceByKey(_+_)
     .filter(_._2>= p)
     .map(_._1)
     .flatMap(x=> Seq(x._1 -> x._2, x._2 -> x._1))
     .groupByKey.mapValues(_.toArray.sorted)

which yields:

Array((1,Array(2, 3)), (2,Array(1, 3)), (3,Array(1, 2, 4)), (4,Array(3)))

Note by the way that you made a mistake in your exemple, 4 and 3 have 2 elements in common (10 and 17). With p=3, you get:

Array((1,Array(2, 3)), (2,Array(1)), (3,Array(1)))

To get even the lines who do not have any "co_relations", join with the original data.

d
    .leftOuterJoin(connexions)
    .mapValues(x=> x._1 -> x._2.getOrElse(null))

And you finally get (with p=3):

Array((1,(List(2, 5, 6, 10, 11),Array(2, 3))), 
      (2,(List(5, 10, 11, 15, 16),Array(1))), 
      (3,(List(6, 10, 11, 17, 21),Array(1))), 
      (4,(List(3, 10, 12, 17, 22),null)))

Nonetheless, if you want to study connexions between your data points in a more general way, I encourage you to have a look to the Graph API of spark. You might for instance be interested in computing the connected components of your graph. ( GraphX )

Upvotes: 1

Related Questions