PhiloJunkie
PhiloJunkie

Reputation: 1169

What is the proper way to compute graph diameter in GraphX

I'm implementing an algorithm on GraphX for which I need to also compute the diameter of some relatively small graphs. The problem is that GraphX doesn't have any notion of undirected graphs, so when using the built-in method from ShortestPaths, it obsviously gets the shortets directed paths. This doesn't help much in computing a graph diameter (Longest Shorted undirected path between any pairs of nodes).

I thought of duplicating the the edges of my graph (instead of |E| I would have 2|E| edges) but I didn't feel it's the right way to do it. So, are there a better way to do it on GraphX notably?

Here is my code for a directed graph:

// computing the query diameter
def getDiameter(graph: Graph[String, Int]):Long = {
    // Get ids of vertices of the graph 
    val vIds = graph.vertices.collect.toList.map(_._1) 
    // Compute list of shortest paths for every vertex in the graph
    val shortestPaths  = lib.ShortestPaths.run(graph, vIds).vertices.collect
    // extract only the distance values from a list of tuples <VertexId, Map> where map contains <key, value>: <dst vertex, shortest directed distance>
    val values = shortestPaths.map(element => element._2).map(element => element.values)

    // diamter is the longest shortest undirected distance between any pair of nodes in te graph
    val diameter  = values.map(m => m.max).max
    diameter 
}

Upvotes: 0

Views: 783

Answers (1)

Tom Lous
Tom Lous

Reputation: 2909

GraphX actually has no notion of direction it you don't tell it to use it. If you look at the inner workings of the ShortestPaths library you'll see that it uses Pregel and the direction is default (EdgeDirection.Either). This means that for all triplets it will add both source & dest to the activeset. However if you specify in the sendMsg function of Pregel to only keep the srcId in the active set (as is happening in the ShortestPaths lib) certain vertices (with only outgoing edges) will not be reevaluated.

Anyway a solution is to write your own Diameter object/library, maybe looking like this (heavily based on ShortestPath, so maybe there are even better solutions?)

object Diameter extends Serializable {
  type SPMap = Map[VertexId, Int]

  def makeMap(x: (VertexId, Int)*) = Map(x: _*)

  def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }

  def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
    (spmap1.keySet ++ spmap2.keySet).map {
      k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
    }(collection.breakOut) // more efficient alternative to [[collection.Traversable.toMap]]
  }

  // Removed landmarks, since all paths have to be taken in consideration
  def run[VD, ED: ClassTag](graph: Graph[VD, ED]): Int = {
    val spGraph = graph.mapVertices { (vid, _) => makeMap(vid -> 0)  }

    val initialMessage:SPMap = makeMap()

    def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
      addMaps(attr, msg)
    }

    def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
      // added the concept of updating the dstMap based on the srcMap + 1
      val newSrcAttr = incrementMap(edge.dstAttr)
      val newDstAttr = incrementMap(edge.srcAttr)

      List(
       if (edge.srcAttr != addMaps(newSrcAttr, edge.srcAttr)) Some((edge.srcId, newSrcAttr)) else None,
       if (edge.dstAttr != addMaps(newDstAttr, edge.dstAttr)) Some((edge.dstId, newDstAttr)) else None
      ).flatten.toIterator
    }

    val pregel = Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)

    // each vertex will contain map with all shortest paths, so just get first
    pregel.vertices.first()._2.values.max
  }
}

val diameter = Diameter.run(graph)

Upvotes: 1

Related Questions