Reputation: 123
Hello I am working on a problem with scala/spark project trying to do some computation my scala code works well on spark-shell but when try to run the same code with sbt-assembly to convert scala to .jar file, I face this error:
Unable to find encoder for type AccessLog. An implicit Encoder[AccessLog] is needed to store AccessLog instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
I am trying to convert Dataset[List[String]] to be Dataset[AccessLog] AccessLog is a case class, by mapping it using.
The code that generates the error:
import org.apache.spark.sql.{ Dataset, Encoder, SparkSession }
import org.apache.spark.sql.functions._
object DstiJob {
// try and catch
def run(spark: SparkSession, inputPath: String, outputPath: String): String = {
// import spark.sqlContext.implicits._
import spark.implicits._
import org.apache.spark.sql.{ Encoder, Encoders }
// implicit val enc: Encoder[AccessLog] = Encoders.product[AccessLog]
val inputPath = "access.log.gz"
val outputPath = "data/reports"
val logsAsString = spark.read.text(inputPath).as[String]
case class AccessLog(ip: String, ident: String, user: String, datetime: String, request: String, status: String, size: String, referer: String, userAgent: String, unk: String)
val R = """^(?<ip>[0-9.]+) (?<identd>[^ ]) (?<user>[^ ]) \[(?<datetime>[^\]]+)\] \"(?<request>[^\"]*)\" (?<status>[^ ]*) (?<size>[^ ]*) \"(?<referer>[^\"]*)\" \"(?<useragent>[^\"]*)\" \"(?<unk>[^\"]*)\""".r
val dsParsed = logsAsString.flatMap(x => R.unapplySeq(x))
def toAccessLog(params: List[String]) = AccessLog(params(0), params(1), params(2), params(3), params(5), params(5), params(6), params(7), params(8), params(9))
val ds: Dataset[AccessLog] = dsParsed.map(toAccessLog _)
val dsWithTime = ds.withColumn("datetime", to_timestamp(ds("datetime"), "dd/MMM/yyyy:HH:mm:ss X"))
dsWithTime.cache
dsWithTime.createOrReplaceTempView("AccessLog")
Upvotes: 1
Views: 313
Reputation: 14895
To solve the compilation error, the case class should be defined outside of the method run
.
Instead of
object DstiJob {
def run(spark: SparkSession, ...) {
[...]
case class AccessLog(...)
val ds: Dataset[AccessLog] = ...
[...]
}
}
you can use
object DstiJob {
case class AccessLog(...)
def run(spark: SparkSession, ...) {
[...]
val ds: Dataset[AccessLog] = ...
[...]
}
}
This should solve the issue, but unfortunately I cannot explain why this helps.
Upvotes: 2