Reputation: 652
I am encountering a deserialization issue that is only showing when I run my code via a test runner. Running an assembled uberjar (with AOT compilation) does not show this behavior and neither does running the same code from a REPL. The test runners that have tried are: cognitect/test-runner and lambdaisland/kaocha.
My general question is: Why would serialization behave differently when in a test runner versus the REPL or uberjar?
I initially suspected that AOT compilation was causing the difference, but I would expect to see the exception in the REPL as well.
Below is the relevant parts of the stack exception stack trace:
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
... trimmed for easier reading ...
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
More details in case it is helpful: My Clojure application uses Spark via Scala/Java interop. The deserialization exception happens when an executor/worker processes receives a new task in the form of a MapPartitionsRDD
object.
Below is a simple deftype
I created that extends the MapFunction
interface provided by Spark. This interface allow for the serialization and broadcasting of a function to worker processes and is leveraged by MapPartitionsRDD
although I am not sure how.
(deftype CljMapFunction [func]
org.apache.spark.api.java.function.MapFunction
(call [_ var1]
(-> var1 row/row->map func row/to-row))) ; The row/* functions are my own.
When I pass an instance of CljMapFunction
to the java-specific map
method of a spark Dataset method, I see the above exception if the code is executed by a test runner. Again, both assembled uberjars and running form the REPL behave as expected.
Upvotes: 1
Views: 184