ImbaBalboa
ImbaBalboa

Reputation: 867

Why does reading stream from Kafka fail with "Unable to find encoder for type stored in a Dataset"?

I am trying to use Spark Structured Streaming with Kafka.

object StructuredStreaming {

  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: StructuredStreaming <hostname> <port>")
      System.exit(1)
    }

    val host = args(0)
    val port = args(1).toInt

    val spark = SparkSession
      .builder
      .appName("StructuredStreaming")
      .config("spark.master", "local")
      .getOrCreate()

    import spark.implicits._

    // Subscribe to 1 topic
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9093")
      .option("subscribe", "sparkss")
      .load()
    lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]
    }
}

I got my code from Spark documentation and I got this build error :

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. .as[(String, String)]

I read on other SO post that it was due to the lack of import spark.implicits._. But it does not change anything for me.

UPDATE :

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <slf4j.version>1.7.12</slf4j.version>
    <spark.version>2.1.0</spark.version>
    <scala.version>2.10.4</scala.version>
    <scala.binary.version>2.10</scala.binary.version>
</properties>

<dependencies>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

Upvotes: 2

Views: 413

Answers (1)

ImbaBalboa
ImbaBalboa

Reputation: 867

Well, I tried with scala 2.11.8

<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

</dependencies>

and with corresponding dependencies (for scala 2.11) and it eventually worked.

Warning : You need to restart your project on intelliJ, I think there are some problems when changing version and not restarting, the errors are still there.

Upvotes: 0

Related Questions