Subhasis
Subhasis

Reputation: 75

GraphX Vertices populated with Null values

I am trying out a code in Spark GraphX and having difficulty with Null.

scala> verticesRDD
res76: org.apache.spark.rdd.RDD[(Long, (String, Long))] = MapPartitionsRDD[78] at map at <console>:51

scala> EdgesRDD
res77: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Boolean]] = MapPartitionsRDD[18] at map at <console>:41

val graph = Graph(verticesRDD, EdgesRDD).cache()

scala> graph
res75: org.apache.spark.graphx.Graph[(String, Long),Boolean] = org.apache.spark.graphx.impl.GraphImpl@9533103

If i pull the vertices properties i am getting some null values.

val x = graph.vertices.map{case(id, v) => v}

scala> x
res78: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[149] at map at <console>:56

scala> x.filter(_ == null).count()
res79: Long = 8999

Where as in the source verticesRDD there is no NUll.

val x = verticesRDD.map{case(id,v) => v}

scala> x
res80: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[151] at map at <console>:54

scala> x.filter(_ == null).count()
res81: Long = 0

I am not able to understand why vertices values can be null when in source RDD for vertices there is no null in the values?

i would be really helpful if you could provide some insight about this.

thanks

Upvotes: 2

Views: 469

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

When VertexIds of verticesRDD and EdgesRDD don't match, then a null vertex is created for the non matching vertexId. And thats the reason you have null in the Graph despite the fact that you don't have null in verticesRDD.

It will be more evident with simple example

scala> import org.apache.spark.graphx._
import org.apache.spark.graphx._

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> val verticesRDD: RDD[(Long, (String, Long))] = sc.parallelize(Seq((0L, ("Subhasis", 0L))))
verticesRDD: org.apache.spark.rdd.RDD[(Long, (String, Long))] = ParallelCollectionRDD[0] at parallelize at <console>:28

scala> val EdgesRDD: RDD[Edge[Boolean]] = sc.parallelize(Seq(Edge(1L, 0L, true)))
EdgesRDD: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Boolean]] = ParallelCollectionRDD[1] at parallelize at <console>:28

scala> val graph = Graph(verticesRDD, edgesRDD)
graph: org.apache.spark.graphx.Graph[(String, Long),Boolean] = org.apache.spark.graphx.impl.GraphImpl@5563a63f

scala> graph.vertices.foreach(println)
[Stage 2:>                                                          (0 + 0) / 4](1,null)
(0,(Subhasis,0))
18/04/20 08:26:22 WARN Executor: 1 block locks were not released by TID = 9:
[rdd_9_3]
18/04/20 08:26:22 WARN Executor: 1 block locks were not released by TID = 8:
[rdd_9_2]
18/04/20 08:26:22 WARN Executor: 1 block locks were not released by TID = 10:
[rdd_9_0]
18/04/20 08:26:22 WARN Executor: 1 block locks were not released by TID = 11:
[rdd_9_1]

scala> 

You can clearly see that (1,null) is created for non matching vertexId of EdgesRDD in the graph

I hope the explanation is clear and helpful

Upvotes: 2

Related Questions