Erp12
Erp12

Reputation: 652

Deserialization of task fails only in test runner

I am encountering a deserialization issue that is only showing when I run my code via a test runner. Running an assembled uberjar (with AOT compilation) does not show this behavior and neither does running the same code from a REPL. The test runners that have tried are: cognitect/test-runner and lambdaisland/kaocha.

My general question is: Why would serialization behave differently when in a test runner versus the REPL or uberjar?

I initially suspected that AOT compilation was causing the difference, but I would expect to see the exception in the REPL as well.

Below is the relevant parts of the stack exception stack trace:

java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
    ... trimmed for easier reading ...
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

More details in case it is helpful: My Clojure application uses Spark via Scala/Java interop. The deserialization exception happens when an executor/worker processes receives a new task in the form of a MapPartitionsRDD object.

Below is a simple deftype I created that extends the MapFunction interface provided by Spark. This interface allow for the serialization and broadcasting of a function to worker processes and is leveraged by MapPartitionsRDD although I am not sure how.

(deftype CljMapFunction [func]
  org.apache.spark.api.java.function.MapFunction
  (call [_ var1]
    (-> var1 row/row->map func row/to-row))) ; The row/* functions are my own.

When I pass an instance of CljMapFunction to the java-specific map method of a spark Dataset method, I see the above exception if the code is executed by a test runner. Again, both assembled uberjars and running form the REPL behave as expected.

Upvotes: 1

Views: 184

Answers (0)

Related Questions