Reputation: 383
I'm new to Scala, and am trying to read an undirected graph as a Graph(GraphX) from a text file. The text file is has the format:
representing that node 1 is connected to nodes 8,9 and 10(adjacency list) and node 2 is connected to nodes 5,6,7,3,1.
I am trying to read this as a Graph(GraphX)
I'm trying to accomplish this using the fromEdge[VD,ED] method(GraphX), where I have to pass pairs of edges.
val graph = sc.textFile("Path to file").map(line=>line.split(",").map(line=>line.toLong)).map{case Array(a,z @ _*)=>(>(a,m) ))}
This gives me,
Vector((1,8), (1,9), (1,10))
Vector((2,5), (2,6), (2,7), (2,3), (2,1))
Since graph is of type Unit, it can't be used with the fromEdge method(GraphX).
I am not able to figure out a way to make Edges from these. Is there a better way to do this?
Could anyone help me with this, or provide me with some resources that might help me?
Upvotes: 1
Views: 2333
Reputation: 311
I guess there are many versions of a solution can be written in Spark/Scala to load a graph from the file format you specified.
Here is an example of a dynamic solution using RDD:
// Loading sample data
scala> val graphData = sc.parallelize(Seq("1, 8, 9, 10", "2,5,6,7,3,1"))
graphData: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5]
// Trim whitespaces and map the String into an Array[Long]
scala> val graphList = x => {
| x.replace(" ", "").split(",").map(_.toLong)
| })
graphList: org.apache.spark.rdd.RDD[Array[Long]] = MapPartitionsRDD[6]
// Here is how graphList looks like now
scala> graphList.collect
res11: Array[Array[Long]] = Array(Array(1, 8, 9, 10), Array(2, 5, 6, 7, 3, 1))
// Generating edges by crossProduct element(0) with the rest of Array elements
scala> val edges = graphList.flatMap(x => x.drop(1).map(y => (x(0), y) )).map(x => Edge(x._1, x._2, "attr"))
edges: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(1,8,attr), Edge(1,9,attr), Edge(1,10,attr), Edge(2,5,attr), Edge(2,6,attr), Edge(2,7,attr), Edge(2,3,attr), Edge(2,1,attr))
// Generating vertices, and adding name/attr for each vertex
scala> val vertices = graphList.flatMap(x => x).map(x => (x, ("name", "attr"))).distinct.sortBy(x => x)
vertices: org.apache.spark.rdd.RDD[(Long, (String, String))] = MapPartitionsRDD
//A default value is defined in case a connection or vertex is missing; the graph is then constructed from the RDD-based structures vertices and edges and the default record:
val default = ("Unknown", "Missing")
// Finally, declare your Graph
scala> val graph = Graph(vertices, edgesRDD, default)
graph: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@8097e8f
// Checking how vertices look like
scala> graph.vertices.collect
res26: Array[(org.apache.spark.graphx.VertexId, (String, String))] = Array((8,(name,attr)), (1,(name,attr)), (9,(name,attr)), (10,(name,attr)), (2,(name,attr)), (3,(name,attr)), (5,(name,attr)), (6,(name,attr)), (7,(name,attr)))
Note you should also consider partitioning (for parallelism) and caching (Vertices, Edges) to optimize your job further.
A better way of constructing a Graph
GraphFrames is a better alternative to GraphX now, which benefit from the scalability and high performance of DataFrames.
I encourage you to read about it and start using it if possible.
A more native format to represent a graph for GraphX or GraphFrames
As an example, here is a vertex file contains just six lines. Each vertex represents a person and has a vertex ID number, a name, and attributes, in this case an age value:
Another edge file contains a set of directed edge values in the form source vertex ID, destination vertex ID, and relationship. So, record 1 forms a Sister relationship between Flo and Mike:
Now your code will become as simple as:
val vertex ="header","true").load("csvgraph1_vertex.csv")
val edges ="header","true").load("csvgraph1_edges.csv")
val graph = GraphFrame(vertex, edges)
GraphFrames integrate with GraphX
GraphFrames fully integrate with GraphX via conversions between the two representations, without any data loss. We can convert our graphs to a GraphX graph and back to a GraphFrame.
val gx: Graph[Row, Row] = g.toGraphX()
val g2: GraphFrame = GraphFrame.fromGraphX(gx)
Upvotes: 1