Reputation: 1291
After parsing the graph from a file, I get a Map where the key represents vertices (id) and the value represents the edge (id). In order to create the edges (Vx->Vy)
we need to join the Map entries using the values (the edge id). The goal is to create a GraphX graph from this representation.
Here is what I have so far:
tempLHM.foreach(x=>println(x))
(A.L0,A)
(B.L0,B)
(C.L0,C)
(D.L0,D)
(E.L0,E)
(a.L0M1,A)
(b.L0M1,B)
(c.L0M1,n4)
(a.L0M2,n4)
(b.L0M2,D)
(c.L0M2,n5)
(a.L0M3,n5)
(b.L0M3,C)
(c.L0M3,E)
Is there a direct way to map this hashmap to vertex and edge RDD?
tempLHM is a mutable LinkedHashMap[String,String]
. In the above hashmap, in elements (A.L0,A) and (a.L0M1,A), A.L0 and a.L0M1 are keys(vertices) that are joined by the common value A (edge)
Here is what I want to derive
val vertex:RDD(vertexId, VertexName) i.e ((A.L0).Long, A.L0), ((a.L0M1).Long, a.L0M1) etc
val edge:RDD((vertexId1, vertexId2), EdgeName) i.e ((A.L0).Long, (a.L0M1).Long), A)
Upvotes: 3
Views: 3025
Reputation: 8996
Assume you have this structure for your data.
val d = Map("v1" -> "e1", "v2" -> "e1", "v3" -> "e2", "v4" -> "e2")
Two edges here ("v1","v2") and ("v3","v4")
Assuming you have a simple graph (not a hyper-graph that can have multiple nodes connected by an edge). Therefore, the assumption for this solution is that an edge only connects two nodes and that edges appear just once.
import collection.mutable.{ HashMap, MultiMap, Set }
import java.security.MessageDigest
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph
// a hacky way to go from string to Long since GraphX need Longs to
// represent vertex IDs. You might want to do something different
// here to make sure that your IDs are unique.
def str2Long(s: String) = s.##.toLong
val d = Map("v1" -> "e1", "v2" -> "e1", "v3" -> "e2", "v4" -> "e2")
// We use a multi-map to create an inverse map (Edge->Set(Vertices))
val mm = new HashMap[String, Set[String]] with MultiMap[String, String]
d.foreach{ x => mm.addBinding(x._2,x._1) }
val edges = mm.map{ case(k,v) => Edge[String](str2Long(v.head),str2Long(v.last), k) }.toList
val vertices = d.keys.map(x => (str2Long(x), x)).toList
val edgeRdd = sc.parallelize(edges)
val vertexRdd = sc.parallelize(vertices)
val g = Graph(vertexRdd, edgeRdd)
If you print the edges and the vertices you get:
g.vertices.foreach(println)
g.edges.foreach(println)
(3709,v3)
(3707,v1)
(3708,v2)
(3710,v4)
Edge(3709,3710,e2)
Edge(3707,3708,e1)
Note: The solution here will only work for data that fit in the memory of a single node. From your question I see that you load the data in a local Map, so the following solution would work for you. If you want to run this on a huge dataset with multiple nodes, the above solution will not work.
This solution is more scalable that the one above. It makes sure that you always stay in the RDD domain without the need to collect the graph at the driver (for example, above we loaded all the raw data in a scala Map, which we are going to avoid here). It also covers the case where we have a common edge ID between different nodes (in a hyper-graph like way).
Let's assume that the text file has this format:
v1,e1
v2,e1
v3,e2
v4,e2
In the code below, we first read the raw data and then we transform them to the proper vertex and edge RDDs.
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph
def str2Long(s: String) = s.##.toLong
val rawData: RDD[String] = sc.textFile("...")
val toBeJoined: RDD[(String, String)]
= rawData.map(_.split(",")).map{ case Array(x,y) => (y,x) }
Note here that our resulting graph will be bidirectional: If we have edge (v1,v2)
we also have edge (v2,v1)
.
val biDirectionalEdges: RDD[(String, (String, String))]
= toBeJoined.join(toBeJoined).filter{ case(e,(v1,v2)) => v1 != v2 }
val edgeRdd =
biDirectionalEdges.map{ case(e,v) => Edge[String](str2Long(v._1),str2Long(v._2), e) }
val vertexRdd =
toBeJoined.map(_._1).distinct.map(x => (str2Long(x), x))
val g = Graph(vertexRdd, edgeRdd)
// Verify that this is the right graph
g.vertices.take(10).foreach(println)
g.edges.take(10).foreach(println)
Upvotes: 4