pferrel
pferrel

Reputation: 5702

Spark broadcast/serialize error

I’ve created a CLI driver for a Spark version of a Mahout job called "item similarity" with several tests that all work fine on local[4] Spark standalone. The code even reads and writes to clustered HDFS. But switching to clustered Spark has a problem that seems tied to a broadcast and/or serialization.

The code uses HashBiMap, which is a Guava Java thing. There are two of these created for every Mahout drm (a distributed matrix), for bi-directional row and column ID lookup. They are created once and then broadcast for access everywhere.

When I run this on clustered Spark I get the following error. At one point we were using HashMaps and they seemed to work on the cluster. So I suspect something about the HashBiMap is causing the problem. I’m also suspicious that it may have to do with serialization in the broadcast. Here is a snippet of code and the error.

 // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
 // broadcast them for access in distributed processes, so they are not recalculated in every task.
 // rowIDDictionary is a HashBiMap[String, Int]
 val rowIDDictionary = asOrderedDictionary(rowIDs) // this creates the HashBiMap in a non-dsitributed manner
 val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)

 val columnIDDictionary = asOrderedDictionary(columnIDs)) // this creates the HashBiMap in a non-dsitributed manner
 val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)

 val indexedInteractions =
   interactions.map { case (rowID, columnID) =>   //<<<<<<<<<<< this is the stage being submitted before the error
     val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
     val columnIndex = columnIDDictionary_bcast.value.get(columnID).get

     rowIndex -> columnIndex
   }

The error seems to happen in executing interactions.map when accessing the _bcast vals. Any idea where to start looking for this?

14/06/26 11:23:36 INFO scheduler.DAGScheduler: Submitting Stage 9 (MappedRDD[17] at map at TextDelimitedReaderWriter.scala:83), which has no missing parents
14/06/26 11:23:36 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 9 (MappedRDD[17] at map at TextDelimitedReaderWriter.scala:83)
14/06/26 11:23:36 INFO scheduler.TaskSchedulerImpl: Adding task set 9.0 with 2 tasks
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Starting task 9.0:0 as TID 16 on executor 0: occam4 (PROCESS_LOCAL)
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Serialized task 9.0:0 as 2418 bytes in 0 ms
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Starting task 9.0:1 as TID 17 on executor 0: occam4 (PROCESS_LOCAL)
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Serialized task 9.0:1 as 2440 bytes in 0 ms
14/06/26 11:23:36 WARN scheduler.TaskSetManager: Lost TID 16 (task 9.0:0)
14/06/26 11:23:36 WARN scheduler.TaskSetManager: Loss was due to java.lang.NullPointerException
java.lang.NullPointerException
    at com.google.common.collect.HashBiMap.seekByKey(HashBiMap.java:180)
    at com.google.common.collect.HashBiMap.put(HashBiMap.java:230)
    at com.google.common.collect.HashBiMap.put(HashBiMap.java:218)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:102)
    at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
    at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
    at org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:69)
    at org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:138)
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1814)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1773)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at         org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:662)

Upvotes: 0

Views: 1675

Answers (1)

Holden
Holden

Reputation: 7442

It looks like you are using kryo serialization, are you using this in your local tests as well? You may wish to explicitly register the class kryo to use Java serilization if kryo serialization for HashBiMap doesn't succeed.

Upvotes: 2

Related Questions