Stephane Maarek
Stephane Maarek

Reputation: 5352

Apache Spark: Scala XML transformation not serializable?

I have a bunch of xmls with a DTD header and I'm trying to load all of them, ignoring the DTD.

val input = sc.wholeTextFiles("""\path\*.nxml""")

val saxfac = SAXParserFactory.newInstance();
saxfac.setValidating(false);
saxfac.setFeature("http://xml.org/sax/features/validation", false);
saxfac.setFeature("http://apache.org/xml/features/nonvalidating/load-dtd-grammar", false);
saxfac.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false);
saxfac.setFeature("http://xml.org/sax/features/external-general-entities", false);
saxfac.setFeature("http://xml.org/sax/features/external-parameter-entities", false);

val loadnode = input.map { case (k,v) => xml.XML.withSAXParser(saxfac.newSAXParser()).loadString(v)}
println(loadnode.count())

I end up with a strange error.... (due to the SAX Parser) What am I doing wrong?

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
    at org.apache.spark.rdd.RDD.map(RDD.scala:271)
    at graphXtutorial.PubMedMainApp$.main(PubMedMainApp.scala:59)
    at graphXtutorial.PubMedMainApp.main(PubMedMainApp.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.io.NotSerializableException: scala.xml.XML$$anon$1
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 10 more

Upvotes: 0

Views: 889

Answers (2)

Nilanjan Sarkar
Nilanjan Sarkar

Reputation: 192

I know I am late by years, but I came across this post during my struggles so thought to share my solution

class XMLParser extends Serializable { @transient lazy val parseXml = (xmlString: String) => {
if(null != xmlString && xmlString.startsWith("<")) {
  val parsedElem = scala.xml.XML.loadString(xmlString)
  val fields = parsedElem \ "field"
  fields.map(node =>
  Field((node \ "name").text,(node \ "key").text,(node \ "description").text,
  (node \ "fullPathKey").text,(node \ "value").text))
}else{
  Nil
}}}

The to get around the task not serializable issue in general is to mark the non-serializable code as @transient lazy val and then encapsulate it within a serializable class. This way Spark will not serialize the variable but will only load it once per executor

Upvotes: 0

lmm
lmm

Reputation: 17431

Spark tasks have to be java-serializable so that they can be sent to other cluster nodes to run on. Try constructing the parser inside the map, so that you're not trying to use a single shared parser instance on every cluster node (or, better, use something like mapPartitions so that you construct one parser instance for each partition - constructing one for each line is probably a lot of overhead).

Upvotes: 2

Related Questions