Reputation: 2230
The exception message as following
User class threw exception: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 11, 10.215.155.82): java.lang.NullPointerException at org.joda.time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143) at org.joda.time.tz.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103) at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:676) at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:521) at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:625) at org.joda.time.base.AbstractDateTime.toString(AbstractDateTime.java:328) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3$$anonfun$apply$1.apply(DateTimeNullReferenceReappear.scala:41) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3$$anonfun$apply$1.apply(DateTimeNullReferenceReappear.scala:41) at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:328) at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:113) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:28) at scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:327) at org.apache.spark.util.collection.CompactBuffer.groupBy(CompactBuffer.scala:28) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3.apply(DateTimeNullReferenceReappear.scala:41) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3.apply(DateTimeNullReferenceReappear.scala:40) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
My code as following:
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.{ SparkConf, SparkContext }
import org.joda.time.DateTime
import org.joda.time.format.{ DateTimeFormat, DateTimeFormatter }
object DateTimeNullReferenceReappear extends App {
case class Record(uin: String = "", date: DateTime = null, value: Double = 0.0)
val cfg = new Configuration
val sparkConf = new SparkConf()
sparkConf.setAppName("bourne_exception_reappear")
val sc = new SparkContext(sparkConf)
val data = TDWSparkContext.tdwTable( // this function just read data from an data warehouse
sc,
tdwuser = FaceConf.TDW_USER,
tdwpasswd = FaceConf.TDW_PASSWORD,
dbName = "my_db",
tblName = "my_table",
parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
.map(row => {
Record(uin = row(2),
date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
value = row(4).toDouble)
}).map(x => (x.uin, (x.date, x.value)))
.groupByKey
.map(x => {
x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum) // throw exception here
})
// val data = TDWSparkContext.tdwTable( // It works, as I don't user datetime toString in the groupBy
// sc,
// tdwuser = FaceConf.TDW_USER,
// tdwpasswd = FaceConf.TDW_PASSWORD,
// dbName = "hy",
// tblName = "t_dw_cf_oss_tblogin",
// parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
// .map(row => {
// Record(uin = row(2),
// date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
// value = row(4).toDouble)
// }).map(x => (x.uin, (x.date.toString("yyyyMMdd"), x.value)))
// .groupByKey
// .map(x => {
// x._2.groupBy(_._1).mapValues(_.map(_._2).sum)
// })
data.take(10).map(println)
}
So, it seems that call toString in the groupBy cause the exception, so can anybody explain it?
Thanks
Upvotes: 11
Views: 5093
Reputation: 20956
sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
Upvotes: 1
Reputation: 37822
The problem here is bad serialization of Joda's CachedDateTimeZone
- it includes a transient field that doesn't get serialized, remaining null
in the deserialized object.
You can create and register your own Serializer
that handles this object properly:
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.joda.time.DateTimeZone;
import org.joda.time.tz.CachedDateTimeZone;
public class JodaCachedDateTimeZoneSerializer extends Serializer<CachedDateTimeZone> {
public JodaCachedDateTimeZoneSerializer() {
setImmutable(true);
}
@Override
public CachedDateTimeZone read(final Kryo kryo, final Input input, final Class<CachedDateTimeZone> type) {
// reconstruct from serialized ID:
final String id = input.readString();
return CachedDateTimeZone.forZone(DateTimeZone.forID(id));
}
@Override
public void write(final Kryo kryo, final Output output, final CachedDateTimeZone cached) {
// serialize ID only:
output.writeString(cached.getID());
}
}
Then, in your class extending KryoRegistrator
, add:
kryo.register(classOf[CachedDateTimeZone], new JodaCachedDateTimeZoneSerializer())
This way you don't have to disable Kryo or refrain from using Joda.
Upvotes: 2
Reputation: 2864
Please refer to this -- https://issues.apache.org/jira/browse/SPARK-4170
Basically, you shouldn't be extending scala.App
for your main class. It may not work correctly in some cases. Use an explicit main()
method instead.
Here's the documented warning in the Spark 1.6.1 code (In SparkSubmit
class)
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
Upvotes: 0
Reputation: 11274
You need to either disable Kryo, use Kryo JodaTime Serializers, or avoid serializing the DateTime object, i.e. pass around Longs.
Upvotes: 12
Reputation: 2394
The issue seems to be that DateTime
looses something when serialising in Spark (which happens a lot there I guess). In my case the Chronology
was messed up which caused the same exception.
One really very hacky workaround which worked for me is to recreate the DateTime
just before using it, e.g.:
date.toMutableDateTime.toDateTime
This seems to restore whatever missing bits and everything is working after that.
The solution posted by Marius Soutier to disable Kryo also worked for me. This is a less hacky approach.
Upvotes: 2
Reputation: 13985
We don't know much about the "problem". So we can try following experimat which will let us see more about the problem.
Replace the following part,
map(x => {
x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum) // throw exception here
})
With this,
map( x => {
x._2.groupBy( t => {
val dateStringTry = Try( t._2.toString( "yyyyMMdd" ) )
dateStringTry match {
case Success( dateString ) => Right( dateString )
case Failure( e ) => {
println( "=========== Null Tuple Description ==========" )
println( "Problem Tuple :: [" + t + "]" )
println( "Error Info :: [" + e.getMessage + "]" )
// finally the stack trace, if needed
// e.printStackTrace()
prinln( "=============================================" )
Left( e )
}
}
} )
} )
Let's check the result of running this experiment.
Upvotes: 1