Stéphanie C
Stéphanie C

Reputation: 829

how to build a graph from tuples in graphx and label the nodes after ?

Some context can be found here, the idea is that I have created a graph from tuples collected from a request on a Hive table. Those correspond to trade relations between countries. Having built the graph this way, the vertices are not labelled. I want to study the distribution of degrees and get the most connected countries' names. I tried 2 options :

In both cases I get the following error : the task is not serializable

Global code :

import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc)
val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect()
val data_2010 = data.filter(line => line(0)==2010)
val couples = data_2010.map(line=>(line(2),line(3))) //pays->pays 

couples look like this: Array[(Any, Any)] = Array((MWI,MOZ), (WSM,AUS), (MDA,CRI), (KNA,HTI), (PER,ERI), (SWE,CUB),...

val idMap = sc.broadcast(couples 
.flatMap{case (x: String, y: String) => Seq(x, y)}
.distinct 
.zipWithIndex  
.map{case (k, v) => (k, v.toLong)}  
.toMap) 

val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples
.map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))})

val graph = Graph.fromEdgeTuples(edges, 1)

built this way, vertices look like (68,1) for example

val degrees: VertexRDD[Int] = graph.degrees.cache()

//Most connected vertices 
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(Int, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, k) => (id.toInt, degree)}
val ord = Ordering.by[(Int, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)

We get : (79,1016),(64,912),(55,889)...

First option to retrieve the names :

val idMapbis = sc.parallelize(couples
.flatMap{case (x: String, y: String) => Seq(x, y)} 
.distinct 
.zipWithIndex  
.map{case (k, v) => (v,k)}  
.toMap)

def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]):  Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, name) => (idMapbis.value(id.toInt), degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)

The task is not serializable but the function idMapbis is working since there is no error with idMapbis.value(graph.vertices.take(1)(0)._1.toInt)

Option 2:

graph.vertices.map{case (k, v) => (k,idMapbis.value(k.toInt))}

The task is not serializable again (for context here is how topNamesAndDegrees is modified to obtain the names of the most connected vertices in this option)

def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, name) => (name, degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)

I am interested in understanding how to improve one of this option, maybe both if someone see how.

Upvotes: 7

Views: 1700

Answers (1)

zero323
zero323

Reputation: 330123

Problem with your attempts is that idMapbis is an RDD. Since we already know your data fits into memory you can simply use a broadcast variable as before:

val idMapRev = sc.broadcast(idMap.value.map{case (k, v) => (v, k)}.toMap)
graph.mapVertices{case (id, _) => idMapRev.value(id)}

Alternatively you could use the correct labels from the beginning:

val countries: RDD[(VertexId, String)] = sc
  .parallelize(idMap.value.map(_.swap).toSeq)

val relationships: RDD[Edge[Int]] = sc.parallelize(couples
 .map{case (x: String, y: String) => Edge(idMap.value(x), idMap.value(y), 1)}
)

val graph = Graph(countries, relationships)

The second approach has one important advantage - if graph is large you relatively easily replace broadcast variables with joins.

Upvotes: 4

Related Questions