kaxil
kaxil

Reputation: 18844

Using neo4j-spark-connector to find specific nodes rather than count & save result in RDD

How can we get the result for the following query using neo4j-spark-connector for Apache Spark in an RDD.

MATCH (n)-[r]-()
WITH n AS Nodes, COUNT(Distinct r) as Degree
RETURN Degree, count(Nodes) 
ORDER BY Degree ASC

The example on github just shows how can we return count for the nodes.

import org.neo4j.spark._
Neo4jRowRDD(sc,"MATCH (n) where id(n) < {maxId} return
id(n)",Seq(("maxId",100000))).count

Can we not load the result in an RDD & check it using .collect() in spark ? When I try to do the same I get the following error:

scala> xyz.take(2)
16/09/19 15:04:46 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.io.NotSerializableException: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<10516047>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<10516047>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 1)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/09/19 15:04:46 ERROR TaskSetManager: Task 0.0 in stage 4.0 (TID 4) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<10516047>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<10516047>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 1); not retrying
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 4.0 (TID 4) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<10516047>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<10516047>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 1)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1305)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1279)
  ... 50 elided

Upvotes: 1

Views: 361

Answers (1)

Michael Hunger
Michael Hunger

Reputation: 41676

Sure, you get an RDD back so you can do whatever.

Btw there is an update to Spark 2.0 with a new API.

Upvotes: 1

Related Questions