Reputation: 439
I've started to learn some Spark\Scala\GraphX to use it with Pregel, I've found some simple code here: http://www.cakesolutions.net/teamblogs/graphx-pregel-api-an-example and wanted to try it. So I tried to compile this code as I thought it should be(It's my first time with Scala):
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
class Graph[VD, ED] (
val vertices: VertexRDD[VD],
val edges: EdgeRDD[ED]) {
}
object SimpleApp {
val initialMsg = 9999
def vprog(vertexId: VertexId, value: (Int, Int), message: Int): (Int, Int) = {
if (message == initialMsg)
value
else
(message min value._1, value._1)
}
def sendMsg(triplet: EdgeTriplet[(Int, Int), Boolean]): Iterator[(VertexId, Int)] = {
val sourceVertex = triplet.srcAttr
if (sourceVertex._1 == sourceVertex._2)
Iterator.empty
else
Iterator((triplet.dstId, sourceVertex._1))
}
def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val minGraph = graph.pregel(initialMsg,
Int.MaxValue,
EdgeDirection.Out)(
vprog,
sendMsg,
mergeMsg)
minGraph.vertices.collect.foreach{
case (vertexId, (value, original_value)) => println(value)
}
sc.stop()
}
}
But I get this error:
$ C:\"Program Files (x86)"\sbt\bin\sbt package
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
[info] Set current project to Simple Project (in build file:/C:/spark/simple/)
[info] Compiling 1 Scala source to C:\spark\simple\target\scala-2.10\classes...
[error] C:\spark\simple\src\main\scala\SimpleApp.scala:42: not found: type VD
[error] : Graph[VD, ED]
[error] ^
[error] C:\spark\simple\src\main\scala\SimpleApp.scala:42: not found: type ED
[error] : Graph[VD, ED]
[error] ^
[error] C:\spark\simple\src\main\scala\SimpleApp.scala:39: not found: type VD
[error] (vprog: (VertexId, VD, A) => VD,
[error] ^
[error] C:\spark\simple\src\main\scala\SimpleApp.scala:39: not found: type VD
[error] (vprog: (VertexId, VD, A) => VD,
[error] ^
[error] C:\spark\simple\src\main\scala\SimpleApp.scala:40: not found: type VD
[error] sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
[error] ^
[error] C:\spark\simple\src\main\scala\SimpleApp.scala:40: not found: type ED
[error] sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
[error] ^
[error] C:\spark\simple\src\main\scala\SimpleApp.scala:35: only classes can have declared but undefined members
[error] def pregel[A]
[error] ^
[error] C:\spark\simple\src\main\scala\SimpleApp.scala:48: not found: value graph
[error] val minGraph = graph.pregel(initialMsg,
[error] ^
[error] 8 errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 5 s, completed Jan 17, 2017 12:35:00 AM
I'm fairly new to Scala so I don't exactly understand VD\ED role in these defines.
Upvotes: 1
Views: 4852
Reputation: 9698
The problem is that you cannot use a type for your method definition that hasn't been declared as a type parameter somewhere in scope (e.g. that method, or containing class etc.).
Look at your method def pregel[A]
. It returns a value of type Graph[VD, ED]
. But how can compiler know what VD
refers to? Without getting into what your code does, fixing this problem is easy if you just add VD
as a method type parameter:
def pregel[A, VD]
Note that if pregel
were a method within the class Graph
, it would be OK because Graph
defines that type: class Graph[VD, ED]
. By the code you posted, it would seem that your methods are lingering in the middle of nowhere, which is not allowed - perhaps you may want to consider moving them inside the Graph
class?
Upvotes: 2