HIREN GALA
HIREN GALA

Reputation: 109

Exception in thread "main" org.apache.spark.sql.AnalysisException:

I am trying my hands on kafka spark structured streaming but getting some exception like Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'device' given input columns: [value, offset, partition, key, timestamp, timestampType, topic];

Attaching my code

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime
case class DeviceData(device: String, deviceType: String, signal: String)

object dataset_kafka {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
          .builder()
          .appName("kafka-consumer")
          .master("local[*]")
          .getOrCreate()
        import spark.implicits._

       spark.sparkContext.setLogLevel("WARN")


    val df = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "172.21.0.187:9093")
        .option("subscribe", "test")
        .option("startingOffsets", "earliest")
        .load()
        println(df.isStreaming)
        println(df.printSchema())

    val ds: Dataset[DeviceData] = df.as[DeviceData]

    val values = df.select("device").where("signal == Strong")

    values.writeStream
          .outputMode("append")
          .format("console")
          .start()
            .awaitTermination()


  }
}

Any help how to solve this ?

Upvotes: 0

Views: 4256

Answers (1)

nonsleepr
nonsleepr

Reputation: 811

Kafka stream always produces following fields: value, offset, partition, key, timestamp, timestampType, topic. In your case, you're interested in value, but be aware that values are always deserialized as byte arrays, thus, type cast to string is required before deserializing JSON.

Try the following code:

import spark.implicits._

val kafkaStream =
  spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "172.21.0.187:9093")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()

// If you don't want to build the schema manually
import org.apache.spark.sql.Encoders
val schema = Encoders.product[DeviceData].schema

import org.apache.spark.sql.functions.from_json
val ds = kafkaStream.select(from_json($"value" cast "string", schema)).as[DeviceData]

val values = ds.filter(_.signal == "Strong").map(_.device)

Upvotes: 1

Related Questions