Reputation: 11
I have been searching on Google or Stackoverflow for a week and still cannot find a good answer on this.
I have a chemical compound dataset and I need to use a third party Jar to read these compounds in SDF (a JSON like data format). And then I have to calculate the similarities between different compounds. Reading and calculating require very complicated chemical details so I cannot reproduce the function by myself. That is, I have to run the function within the mapping function on Spark using this 3rd party Jar. The Jar file is called JCompoundMapper. It reads the atomic bond iteratively using DFS algorithm and it seems to be very complicated. Anyways, this thread is not about reading chemical compound. But about how to map a 3rd party jar on Spark. When I tried to do this, I came across the task not serializable issue:
import de.zbit.jcmapper.distance.DistanceTanimoto
import de.zbit.jcmapper.distance.IDistanceMeasure
import de.zbit.jcmapper.fingerprinters.EncodingFingerprint
import de.zbit.jcmapper.fingerprinters.features.FeatureMap
import de.zbit.jcmapper.fingerprinters.features.IFeature
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DAllShortestPath
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DCATS
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DECFP
import de.zbit.jcmapper.io.reader.RandomAccessMDLReader
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintCSV
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintTABUnfolded
import de.zbit.jcmapper.io.writer.ExporterLinear
import de.zbit.jcmapper.io.writer.ExporterSDFProperty
import java.io.FileWriter
import java.util.List
import java.io.File
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
val func = combinations.map(x => {
getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 48 elided
Caused by: java.io.NotSerializableException: de.zbit.jcmapper.io.reader.RandomAccessMDLReader`
I read other threads and understand that I have to put the variables and functions in an object to make it serializable. However, if I do this, I came across the null pointer exception error:
object Holder{
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
}
val func = combinations.map(x => {
Holder.getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-245-2-223.ec2.internal, executor 1): java.lang.NullPointerException
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.setRanges(Unknown Source)
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.<init>(Unknown Source)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Holder$.<init>(<console>:78)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder$lzycompute(<console>:77)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder(<console>:77)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:85)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:84)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)`
For the reading part, I can use a huge linkedHashMap and store all the compounds there. However, I have to use getSimilarity()
function to calculate the similarity using the 3rd party jar. So even if I only use the getSimilarity()
funciton, if I put it inside an object I have the null pointer exception. If I put it outside the object I have the task not serializable issue. Therefore, I have couple questions that I was not able to find a good answer:
Thank you very much for all of your answers! I really appreciate your help!
Best, Michael
Upvotes: 1
Views: 226
Reputation: 1738
I think the problem is in this line:
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
By placing this code in an object
each JVM that your Spark job runs on has to initialize it. So in effect this code is trying to read the file datasets/internal.sdf
from the local file system, where ever that is in your Spark cluster. Is that file available everywhere?
If you are not ready to put the file everywhere, you can try to put it on the classpath and read is as a resource.
Upvotes: 2