kstanisz
kstanisz

Reputation: 227

Breadth First Search algorithm using Apache Spark Graphx

I'm trying to implement BFS(Breadth First Search) algorithm using Apache Spark Graphx.

This is my current implementation:

object BFSAlgorithm {

  def run(graph: Graph[VertexId, Int], sourceVertex: VertexId): Graph[Int, Int] = {

    val bfsGraph: Graph[Int, Int] = graph.mapVertices((vertex, _) =>
      if (vertex == sourceVertex) {
        0
      } else {
        Int.MaxValue
      }
    )

    var queue: Queue[VertexId] = Queue[VertexId](sourceVertex)
    while(queue.nonEmpty){
      val currentVertexId = queue.dequeue()
      val neighbours: RDD[EdgeTriplet[Int, Int]] = bfsGraph.triplets.filter(_.srcId == currentVertexId)
      for(triplet <- neighbours){
        if(triplet.dstAttr == Int.MaxValue){
          queue += triplet.dstId
        }
        val distance = triplet.srcAttr + 1
        if(distance < triplet.dstAttr){
          // Update vertex attibute
          bfsGraph.mapVertices((vertex, _) => if(vertex == triplet.dstId) distance else triplet.dstAttr)
        }
      }
    }
    bfsGraph
  }

}

I'm getting null pointer exception when I try to update vertex attribute in line:

bfsGraph.mapVertices((vertex, _) => if(vertex == triplet.dstId) distance else triplet.dstAttr)

I'm confused, becouse in for loop bfsGraph.vertices is null.

Can anyone explain me why? What is the best way to update vertex attribute in graph?

Upvotes: 2

Views: 1646

Answers (1)

PhiloJunkie
PhiloJunkie

Reputation: 1169

This implementation does not work because you are trying to access an RDD inside another RDD. What GraphX tries to do when you call the for loop on neighbors is to collect a closure for the loop containing the list of required variables inside its body which in this case involves another RDD (bsfGraph) resulting to a NullPointerException.

Upvotes: 0

Related Questions