Reputation: 1580
This question is about Spark GraphX. I want to compute a subgraph by removing nodes that are neighbors of certain other nodes.
Example
[Task] Retain A nodes and B nodes that are not neighbors of C2 nodes.
Input graph:
┌────┐
┌─────│ A │──────┐
│ └────┘ │
v v
┌────┐ ┌────┐ ┌────┐ ┌────┐
│ C1 │────>│ B │ │ B │<────│ C2 │
└────┘ └────┘ └────┘ └────┘
^ ^
│ ┌────┐ │
└─────│ A │──────┘
└────┘
Output graph:
┌────┐
┌─────│ A │
│ └────┘
v
┌────┐
│ B │
└────┘
^
│ ┌────┐
└─────│ A │
└────┘
How to elegantly write a GraphX query that returns the output graph?
Upvotes: 3
Views: 1606
Reputation: 1580
Here is another solution. This solution uses aggregateMessages to send an integer (1) to those B's that should be removed from the graph. The resulting vertex set is joined with the graph and a subsequent subgraph call removes the unwanted B's from the output graph.
// Step 1: send the message (1) to vertices that should be removed
val deleteMe = graph.aggregateMessages[Int](
ctx => {
if (ctx.dstAttr.equals("B") && ctx.srcAttr.equals("C")) {
ctx.sendToDst(1) // 1 means delete, but number is not actually used
}
},
(a,b) => a // choose either message, they are all (1)
)
// Step 2: join vertex sets, original and deleteMe
val joined = graph.outerJoinVertices(deleteMe) {
(id, origValue, msgValue ) => msgValue match {
case Some(number) => "deleteme" // vertex received msg
case None => origValue
}
}
// Step 3: Remove nodes with domain = deleteme
joined.subgraph(vpred = (id, data) => data.equals("deleteme"))
I'm thinking of a way to use only one intermediate deletion flag, e.g. "deleteme", instead of both 1 and "deleteme". But this is a good as I could make it so far.
Upvotes: 3
Reputation: 13927
A different way to find val nodesAB
using GraphOps.collectNeighbors
val nodesAB = graph.collectNeighbors(EdgeDirection.Either)
.filter{case (vid,ns) => ! ns.map(_._2).contains("C2")}.map(_._1)
.intersection(
graph.vertices
.filter{case (vid,attr) => ! attr.toString.startsWith("C") }.map(_._1)
)
The rest works the same way you had:
val solution1 = Graph(nodesAB, graph.edges) .
subgraph(vpred = {case(id, label) => label != null})
If you want to use DataFrames, which could be (?) more scalable, then first we need to turn nodesAB into a DataFrame:
val newNodes = sqlContext.createDataFrame(
nodesAB,
StructType(Array(StructField("newNode", LongType, false)))
)
And you created and edges DataFrame with this:
val edgeDf = sqlContext.createDataFrame(
graph.edges.map{edge => Row(edge.srcId, edge.dstId, edge.attr)},
StructType(Array(
StructField("srcId", LongType, false),
StructField("dstId", LongType, false),
StructField("attr", LongType, false)
))
)
You could then do this to create your graph without the subgraph:
val solution1 = Graph(
nodesAB,
edgeDf
.join(newNodes, $"srcId" === $"newNode").select($"srcId", $"dstId", $"attr")
.join(newNodes, $"dstId" === $"newNode")
.rdd.map(row => Edge(row.getLong(0), row.getLong(1), row.getLong(2)))
)
Upvotes: 3
Reputation: 1580
One solutions is use the triplet view to identify the subset of B nodes that are neighbors of C1 nodes. Next, union those with the A nodes. Next, create a new Graph:
// Step 1
// Compute the subset of B's that are neighbors with C1
val nodesBC1 = graph.triplets .
filter {trip => trip.srcAttr == "C1"} .
map {trip => (trip.dstId, trip.dstAttr)}
// Step 2
// Union the subset B's with all the A's
val nodesAB = nodesBC1 .
union(graph.vertices filter {case (id, label) => label == "A"})
// Step 3
// Create a graph using the subset nodes and all the original edges
// Remove nodes that have null values
val solution1 = Graph(nodesAB, graph.edges) .
subgraph(vpred = {case(id, label) => label != null})
In step 1 I recreate a node RDD (containing B-nodes) by mapping together the dstID and dstAttr of the triplet view. Not sure how efficient this is going to be for large graphs?
Upvotes: 0