user8236853
user8236853

Reputation:

Spark is not able the serialize the task, using UDF in filter

Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.collect(RDD.scala:926) at org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2.apply(DateFormatConstraint.scala:32) at org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2.apply(DateFormatConstraint.scala:16) at org.exadatum.ddq.core.Runner$$anonfun$run$1$$anonfun$3.apply(Runner.scala:22) at org.exadatum.ddq.core.Runner$$anonfun$run$1$$anonfun$3.apply(Runner.scala:22) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.exadatum.ddq.core.Runner$$anonfun$run$1.apply(Runner.scala:22) at org.exadatum.ddq.core.Runner$$anonfun$run$1.apply(Runner.scala:20) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.exadatum.ddq.core.Runner$.run(Runner.scala:20) at org.exadatum.ddq.core.RunCheck.(RunCheck.scala:104) at org.exadatum.ddq.core.DQJobTrigger$.main(DQJobTrigger.scala:39) at org.exadatum.ddq.core.DQJobTrigger.main(DQJobTrigger.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@1d9bd4d6) - field (class: org.exadatum.ddq.constraints.DateFormatConstraint, name: sc, type: class org.apache.spark.SparkContext) - object (class org.exadatum.ddq.constraints.DateFormatConstraint, DateFormatConstraint(startdate,java.text.SimpleDateFormat@4f76f1a0,org.apache.spark.SparkContext@1d9bd4d6,xdqdemo.customer_details)) - field (class: org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2, name: $outer, type: class org.exadatum.ddq.constraints.DateFormatConstraint) - object (class org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2, ) - field (class: org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2$$anonfun$3, name: $outer, type: class org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2) - object (class org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2$$anonfun$3, ) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, ) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(startdate#2)) - writeObject data (class: scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(UDF(startdate#2))) - field (class: org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan, name: predicates, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan, InMemoryColumnarTableScan [phone_number#0,name#1,startdate#2], [UDF(startdate#2)], InMemoryRelation [phone_number#0,name#1,startdate#2], true, 10000, StorageLevel(false, true, false, true, 1), ConvertToUnsafe, None ) - field (class: org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan$$anonfun$doExecute$1, name: $outer, type: class org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan) - object (class org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan$$anonfun$doExecute$1, ) - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, name: f$22, type: interface scala.Function1) - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, ) - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, ) - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[8] at rdd at DateFormatConstraint.scala:32) - field (class: org.apache.spark.NarrowDependency, name: rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@316975be) - writeObject data (class: scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@316975be)) - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[9] at rdd at DateFormatConstraint.scala:32) - field (class: org.apache.spark.NarrowDependency, name: rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@526fbb80) - writeObject data (class: scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@526fbb80)) - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[10] at rdd at DateFormatConstraint.scala:32) - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name: $outer, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1, ) - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$collect$1) - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 39 more

CODE SNIPPET :

val fun = (df: DataFrame) => {

format.setLenient(false)
val cannotBeDate = udf((column: String) => column != null && Try(format.parse(column)).isFailure)
val maybeCannotBeDateCount = Try(df.filter(cannotBeDate(new Column(columnName))).count);

/** Utility to persist all of the bad records   **/

val hiveContext = new HiveContext(sc)
import hiveContext.implicits._

//Writing all Bad records
//val intermediateYriteToHiveDf = df.filter(cannotBeDate(new Column(columnName)))
val writeToHiveDf = df.filter(cannotBeDate(new Column(columnName)))

var recordLists = new ListBuffer[List[(String, String, String)]]()
writeToHiveDf.rdd.collect().foreach {
  row =>
    val item = row.mkString("-")
    val recordList: List[(String, String, String)] = List(List(tableName, "ALWAYS_NULL_CONSTRAINT", item))
      .map { case List(a, b, c) => (a, b, c) }
    recordLists += recordList
}
val listRDD = sc.parallelize(recordLists.flatten)
val dataFrameToHive: DataFrame = listRDD.toDF("table_name", "constraint_applied", "data")
dataFrameToHive.write.mode("append").saveAsTable("xdqdemo.bad_records")



DateFormatConstraintResult(
  this,
  data = maybeCannotBeDateCount.toOption.map(DateFormatConstraintResultData),
  status = ConstraintUtil.tryToStatus[Long](maybeCannotBeDateCount, _ == 0)
)

}

Upvotes: 0

Views: 1369

Answers (1)

user8236853
user8236853

Reputation:

 object checkConstraint extends Serializable{
  def checkDateFormat(format: SimpleDateFormat,df: DataFrame): DataFrame = {
    format.setLenient(false)
    val checkDateFormat = (column: String) => Try(format.parse(column)).isFailure
    val cannotBeDate = udf((column: String) => column != null && checkDateFormat(column))
    df.filter(cannotBeDate(new Column(columnName)))
  }
}


val writeToHiveDf = checkConstraint.checkDateFormat(format,df)

hence all the computations are packed inside a singleton object, which returns a required dataframe

Upvotes: 0

Related Questions