Reputation: 127
i am new to spark graphx and trying to compute distributedly the intersection between a specific node for example node with ID = 1 with its neighbors in spark GraphX.
i have loaded edge list with GraphLoader.edgeListFile(sc,"Path"). and then i find neighbors ID for node id =1 with collectNeighborIds and i do a map function on it to find the each neighbor neighbors and compute intersection with selected node(node with ID =1) neighbors. Here is the code.
val graph = GraphLoader.edgeListFile(sc,path to edgelist)
val node_collect_neighborsId1 = graph.collectNeighborIds(EdgeDirection.Either).filter(x=> x._1 == 1)
val node1_neighbors_ID=node_collect_neighborsId1.flatMap(x=> x._2)
def compute_intersection (vertex :VertexId) = {
var node2_collect_neighborsId: RDD[(VertexId, Array[VertexId])] = graph.collectNeighborIds(EdgeDirection.Either).filter(x=> x._1 == vertex)
var node2_neighbors_ID=node2_collect_neighborsId.flatMap(x=> x._2)
var intersect_two_node = node1_neighbors_ID.intersection(node2_neighbors_ID)
(vertex, intersect)
}
val result = node1_neighbors_ID.map(compute_intersection)
i expect at the end, the result variable should contain rows that include vertex id that is the id of neighbor and common nodes between two set of node neighbors that we have called intersection on them. but i cant print them and see what is inside it. please help me with this problem on computing intersection and printing the result
Upvotes: 1
Views: 905
Reputation: 1240
You can't build a result of type RDD[RDD[T]]
. Hence you shouldn't compute intersection for each neighbor inside map
.
You can compute intersection for all target's neighbors with aggregateMessages
:
def computeIntersection[VD, ED](graph: Graph[VD, ED], targetVertexId: Long): VertexRDD[List[Long]] = {
//mark the target's neighbors
val verticesWithTargetNeighborFlag = graph.aggregateMessages[Boolean](
triplet => {
if(triplet.srcId == targetVertexId && triplet.dstId != targetVertexId) {
triplet.sendToDst(true)
} else if(triplet.dstId == targetVertexId && triplet.dstId != targetVertexId) {
triplet.sendToSrc(true)
}
},
(msg1, msg2) => msg1 || msg2,
TripletFields.None
)
val graphWithTargetNeighborFlag = Graph(verticesWithTargetNeighborFlag, edges)
//collect intersection vertices for each target's neighbor
val verticesWithIntersection = graphWithTargetNeighborFlag.aggregateMessages[List[Long]](
triplet => if (triplet.srcAttr && triplet.dstAttr) { //both are target's neighbors
triplet.sendToDst(List(triplet.srcId))
triplet.sendToSrc(List(triplet.dstId))
},
(msg1, msg2) => msg1 ::: msg2,
TripletFields.All
)
verticesWithIntersection
}
And you can print RDD elements using collect
:
rdd.collect().foreach(println)
Upvotes: 1