sqoor
sqoor

Reputation: 123

Unable to find encoder for type AccessLog. An implicit Encoder[AccessLog] is needed to store AccessLog instances in a Dataset

Hello I am working on a problem with scala/spark project trying to do some computation my scala code works well on spark-shell but when try to run the same code with sbt-assembly to convert scala to .jar file, I face this error:

Unable to find encoder for type AccessLog. An implicit Encoder[AccessLog] is needed to store AccessLog instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

I am trying to convert Dataset[List[String]] to be Dataset[AccessLog] AccessLog is a case class, by mapping it using.

Error screenshot

The code that generates the error:

import org.apache.spark.sql.{ Dataset, Encoder, SparkSession }
import org.apache.spark.sql.functions._

object DstiJob {

  // try and catch
  def run(spark: SparkSession, inputPath: String, outputPath: String): String = {
    // import spark.sqlContext.implicits._
    import spark.implicits._
    import org.apache.spark.sql.{ Encoder, Encoders }
    // implicit val enc: Encoder[AccessLog] = Encoders.product[AccessLog]

    val inputPath = "access.log.gz"
    val outputPath = "data/reports"
    val logsAsString = spark.read.text(inputPath).as[String]

    case class AccessLog(ip: String, ident: String, user: String, datetime: String, request: String, status: String, size: String, referer: String, userAgent: String, unk: String)

    val R = """^(?<ip>[0-9.]+) (?<identd>[^ ]) (?<user>[^ ]) \[(?<datetime>[^\]]+)\] \"(?<request>[^\"]*)\" (?<status>[^ ]*) (?<size>[^ ]*) \"(?<referer>[^\"]*)\" \"(?<useragent>[^\"]*)\" \"(?<unk>[^\"]*)\""".r
    val dsParsed = logsAsString.flatMap(x => R.unapplySeq(x))
    def toAccessLog(params: List[String]) = AccessLog(params(0), params(1), params(2), params(3), params(5), params(5), params(6), params(7), params(8), params(9))

    val ds: Dataset[AccessLog] = dsParsed.map(toAccessLog _)
    val dsWithTime = ds.withColumn("datetime", to_timestamp(ds("datetime"), "dd/MMM/yyyy:HH:mm:ss X"))
    dsWithTime.cache
    dsWithTime.createOrReplaceTempView("AccessLog")

Upvotes: 1

Views: 313

Answers (1)

werner
werner

Reputation: 14895

To solve the compilation error, the case class should be defined outside of the method run.

Instead of

object DstiJob {

    def run(spark: SparkSession, ...) {
       [...]
       case class AccessLog(...)
       val ds: Dataset[AccessLog] = ...
       [...]
    }
}

you can use

object DstiJob {

   case class AccessLog(...)

   def run(spark: SparkSession, ...) {
       [...]  
       val ds: Dataset[AccessLog] = ...
       [...]
   }
}

This should solve the issue, but unfortunately I cannot explain why this helps.

Upvotes: 2

Related Questions