Reputation: 75
I am trying out a code in Spark GraphX and having difficulty with Null.
scala> verticesRDD
res76: org.apache.spark.rdd.RDD[(Long, (String, Long))] = MapPartitionsRDD[78] at map at <console>:51
scala> EdgesRDD
res77: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Boolean]] = MapPartitionsRDD[18] at map at <console>:41
val graph = Graph(verticesRDD, EdgesRDD).cache()
scala> graph
res75: org.apache.spark.graphx.Graph[(String, Long),Boolean] = org.apache.spark.graphx.impl.GraphImpl@9533103
If i pull the vertices properties i am getting some null values.
val x = graph.vertices.map{case(id, v) => v}
scala> x
res78: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[149] at map at <console>:56
scala> x.filter(_ == null).count()
res79: Long = 8999
Where as in the source verticesRDD there is no NUll.
val x = verticesRDD.map{case(id,v) => v}
scala> x
res80: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[151] at map at <console>:54
scala> x.filter(_ == null).count()
res81: Long = 0
I am not able to understand why vertices values can be null when in source RDD for vertices there is no null in the values?
i would be really helpful if you could provide some insight about this.
thanks
Upvotes: 2
Views: 469
Reputation: 41957
When VertexIds
of verticesRDD
and EdgesRDD
don't match, then a null vertex is created for the non matching vertexId. And thats the reason you have null in the Graph
despite the fact that you don't have null in verticesRDD
.
It will be more evident with simple example
scala> import org.apache.spark.graphx._
import org.apache.spark.graphx._
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> val verticesRDD: RDD[(Long, (String, Long))] = sc.parallelize(Seq((0L, ("Subhasis", 0L))))
verticesRDD: org.apache.spark.rdd.RDD[(Long, (String, Long))] = ParallelCollectionRDD[0] at parallelize at <console>:28
scala> val EdgesRDD: RDD[Edge[Boolean]] = sc.parallelize(Seq(Edge(1L, 0L, true)))
EdgesRDD: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Boolean]] = ParallelCollectionRDD[1] at parallelize at <console>:28
scala> val graph = Graph(verticesRDD, edgesRDD)
graph: org.apache.spark.graphx.Graph[(String, Long),Boolean] = org.apache.spark.graphx.impl.GraphImpl@5563a63f
scala> graph.vertices.foreach(println)
[Stage 2:> (0 + 0) / 4](1,null)
(0,(Subhasis,0))
18/04/20 08:26:22 WARN Executor: 1 block locks were not released by TID = 9:
[rdd_9_3]
18/04/20 08:26:22 WARN Executor: 1 block locks were not released by TID = 8:
[rdd_9_2]
18/04/20 08:26:22 WARN Executor: 1 block locks were not released by TID = 10:
[rdd_9_0]
18/04/20 08:26:22 WARN Executor: 1 block locks were not released by TID = 11:
[rdd_9_1]
scala>
You can clearly see that (1,null)
is created for non matching vertexId of EdgesRDD in the graph
I hope the explanation is clear and helpful
Upvotes: 2