Adelina Balasa
Adelina Balasa

Reputation: 91

How to create a graph from a CSV file using Graph.fromEdgeTuples in Spark Scala

I'm new to Spark and Scala, and I'm trying to carry out a simple task of creating a graph from data in a text file.

From the documentation

https://spark.apache.org/docs/0.9.0/api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdges[VD,ED]%28RDD[Edge[ED]],VD%29%28ClassTag[VD],ClassTag[ED]%29:Graph[VD,ED]

I can see that I can create a graph from tuples of vertices.

My simple text file looks like this, where each number is a vertex:

v1 v3
v2 v1
v3 v4
v4
v5 v3

When I read the data from the file

val myVertices = myData.map(line=>line.split(" ")) I get an RDD[Array[String]].

My questions are:

  1. If this is the right way to approach the problem, how do I turn the RDD[Array[String]] into the correct format, which according to the documentation is RDD[(VertexId, VertexId)] (also VertexID has to be of type long, and I am working with strings)

  2. Is there an alternative, easier way in which I can construct a graph from a similar structure of csv file?

Any suggestion would be very welcome. Thanks!

Upvotes: 4

Views: 6179

Answers (4)

udarajag
udarajag

Reputation: 23

you can use a good hash function to convert the string value into a long.

Upvotes: 0

Robin East
Robin East

Reputation: 156

If you file was in edge list format e.g.

v1 v3
v2 v1
v3 v4
v5 v3

then you can simply use the following which will work out what the vertices are from the endpoints of the edges:

import org.apache.spark.graphx._
val graph = GraphLoader.edgeListFile(sc, "so_test.txt")

However as it stands that 'v4' on it's own means that edgeListFile throws an exception

Upvotes: -1

Pawan B
Pawan B

Reputation: 4623

There are Many ways by which you can create graph from a text file .

This code creates a graph from Graph.fromEdgeTuples method

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.GraphLoader
import scala.util.MurmurHash
import org.apache.spark.graphx.Graph
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.VertexId

object GraphFromFile {
  def main(args: Array[String]) {

    //create SparkContext
    val sparkConf = new SparkConf().setAppName("GraphFromFile").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    // read your file
    /*suppose your data is like 
    v1 v3
    v2 v1
    v3 v4
    v4 v2
    v5 v3
    */
    val file = sc.textFile("src/main/resources/textFile1.csv");

    // create edge RDD of type RDD[(VertexId, VertexId)]
    val edgesRDD: RDD[(VertexId, VertexId)] = file.map(line => line.split(" "))
      .map(line =>
        (MurmurHash.stringHash(line(0).toString), MurmurHash.stringHash(line(1).toString)))

    // create a graph 
    val graph = Graph.fromEdgeTuples(edgesRDD, 1)

    // you can see your graph 
    graph.triplets.collect.foreach(println)

  }
}

MurmurHash.stringHash is used because file contains vertex in form of String . If its of Numeric type then it wont be required .

Upvotes: 5

Phasmid
Phasmid

Reputation: 953

First of all, you should read and understand the Spark programming guide: https://spark.apache.org/docs/1.1.0/graphx-programming-guide.html

Next, you need to determine what kind of Edge and Vertex you will represent in your graph. Given that you appear to have nothing to attach to your vertices and edges, it looks like you need something like:

type MyVertex = (Long,Unit)

If you find you do have something, like a String, to attach to each vertex, then replace Unit by String and, in the following, replace null by an appropriate String.

Now you need an array (or other Seq) of vertices which you then convert to an RDD--something like this:

val vertices: Seq[MyVertex] = Array(new MyVertex(1L,null),new MyVertex(2L,null),new MyVertex(3L,null))
val rddVertices: RDD[(VertexId, Unit)] = sc.parallelize(vertices)

where sc is your instance of SparkContext. And your vertices and edges are read from your CSV file and suitably converted to longs. I won't detail that code but it's simple enough, especially if you change the format of the CSV file to remove the "v" prefix from each vertex id.

Similarly, you have to create the edges that you want:

type MyEdge = Edge[Unit]
val edge1 = new MyEdge(1L,2L)
val edge2 = new MyEdge(2L,3L)
val edges = Array(edge1,edge2)
val rdd = sc.parallelize(edges)

Finally, you create your graph:

val graph = Graph(rddVertices,rddEdges)

I have similar code in my own application which I have tried to massage into what you need, but I can't guarantee that this will be perfect. But it should get you started.

Upvotes: 0

Related Questions