Hamid Roghani
Hamid Roghani

Reputation: 127

How to find intersection between a specific node and its neighbors in Spark GraphX with Scala

i am new to spark graphx and trying to compute distributedly the intersection between a specific node for example node with ID = 1 with its neighbors in spark GraphX.

i have loaded edge list with GraphLoader.edgeListFile(sc,"Path"). and then i find neighbors ID for node id =1 with collectNeighborIds and i do a map function on it to find the each neighbor neighbors and compute intersection with selected node(node with ID =1) neighbors. Here is the code.

val graph = GraphLoader.edgeListFile(sc,path to edgelist)
val node_collect_neighborsId1 = graph.collectNeighborIds(EdgeDirection.Either).filter(x=> x._1 == 1)

val node1_neighbors_ID=node_collect_neighborsId1.flatMap(x=> x._2)

def compute_intersection (vertex :VertexId) = {


  var node2_collect_neighborsId: RDD[(VertexId, Array[VertexId])] = graph.collectNeighborIds(EdgeDirection.Either).filter(x=> x._1 == vertex)

  var node2_neighbors_ID=node2_collect_neighborsId.flatMap(x=> x._2)

  var intersect_two_node = node1_neighbors_ID.intersection(node2_neighbors_ID)

  (vertex, intersect)

}

val result = node1_neighbors_ID.map(compute_intersection)

i expect at the end, the result variable should contain rows that include vertex id that is the id of neighbor and common nodes between two set of node neighbors that we have called intersection on them. but i cant print them and see what is inside it. please help me with this problem on computing intersection and printing the result

Upvotes: 1

Views: 905

Answers (1)

Aleksey Isachenkov
Aleksey Isachenkov

Reputation: 1240

You can't build a result of type RDD[RDD[T]]. Hence you shouldn't compute intersection for each neighbor inside map.

You can compute intersection for all target's neighbors with aggregateMessages:

def computeIntersection[VD, ED](graph: Graph[VD, ED], targetVertexId: Long): VertexRDD[List[Long]] = {
  //mark the target's neighbors
  val verticesWithTargetNeighborFlag = graph.aggregateMessages[Boolean](
    triplet => {
      if(triplet.srcId == targetVertexId && triplet.dstId != targetVertexId) {
        triplet.sendToDst(true)
      } else if(triplet.dstId == targetVertexId && triplet.dstId != targetVertexId) {
        triplet.sendToSrc(true)
      }
    },
    (msg1, msg2) => msg1 || msg2,
    TripletFields.None
  )
  val graphWithTargetNeighborFlag = Graph(verticesWithTargetNeighborFlag, edges)
  //collect intersection vertices for each target's neighbor
  val verticesWithIntersection = graphWithTargetNeighborFlag.aggregateMessages[List[Long]](
    triplet => if (triplet.srcAttr && triplet.dstAttr) { //both are target's neighbors
      triplet.sendToDst(List(triplet.srcId))
      triplet.sendToSrc(List(triplet.dstId))
    },
    (msg1, msg2) => msg1 ::: msg2,
    TripletFields.All
  )
  verticesWithIntersection
}

And you can print RDD elements using collect:

rdd.collect().foreach(println)

Upvotes: 1

Related Questions