ALs
ALs

Reputation: 509

applying a function to graph data using mapReduceTriplets in spark and graphx

I'm having some problems applying the mapReduceTriplets to my graph network in spark using graphx.

I've been following the tutorials and read in my own data which is put together as [Array[String],Int], so for example my vertices are:

org.apache.spark.graphx.VertexRDD[Array[String]] e.g. (3999,Array(17, Low, 9))

And my edges are:

org.apache.spark.graphx.EdgeRDD[Int] e.g. Edge(3999,4500,1)

I'm trying to apply an aggregate type function using mapReduceTriplets which counts how many of the last integer in the array of a vertices (in the above example 9) is the same or different to the first integer (in the above example 17) of all connected vertices.

So you would end up with a list of counts for the number of matches or non-matches.

The problem I am having is applying any function using mapReduceTriplets, I am quite new to scala so this may be really obvious, but in the graphx tutorials it has an example which is using a graph with the format Graph[Double, Int], however my graph is in the format of Graph[Array[String],Int], so i'm just trying as a first step to figure out how I can use my graph in the example and then work from there.

The example on the graphx website is as follows:

    val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      Iterator((triplet.dstId, (1, triplet.srcAttr)))
    } else {
      // Don't send a message for this triplet
      Iterator.empty
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)

Any advice would be most appreciated, or if you think there is a better way than using mapreducetriplets I would be happy to hear it.

Edited new code

val nodes = (sc.textFile("C~nodeData.csv")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

val edges = GraphLoader.edgeListFile(sc, "C:~edges.txt")


val graph = edges.outerJoinVertices(nodes) {
case (uid, deg, Some(attrList)) => attrList
case (uid, deg, None) => Array.empty[String]
}


val countsRdd = graph.collectNeighbors(EdgeDirection.Either).leftOuterJoin(graph.vertices).map {
  case (id, t) => {
    val neighbors: Array[(VertexId, Array[String])] = t._1
    val nodeAttr = (t._2)
    neighbors.map(_._2).count( x => x.apply(x.size - 1) == nodeAttr(0))

  }
}

Upvotes: 1

Views: 1903

Answers (1)

David Griffin
David Griffin

Reputation: 13927

I think you want to use GraphOps.collectNeighbors instead of either mapReduceTriplets or aggregateMessages.

collectNeighbors will give you an RDD with, for every VertexId in your graph, the connected nodes as an array. Just reduce the Array based on your needs. Something like:

val countsRdd = graph.collectNeighbors(EdgeDirection.Either)
  .join(graph.vertices)
  .map{ case (vid,t) => {
    val neighbors = t._1
    val nodeAttr = t._2
    neighbors.map(_._2).filter( <add logic here> ).size
  }

If this doesn't get you going in the right direction, or you get stuck, let me know (the "" part, for example).

Upvotes: 1

Related Questions