im trying to connect my structured streaming spark 2.4.5 with kafka, but all the times that im trying this Data Source Provider errors appears. Follow my scala code and my sbt build:
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
object streaming_app_demo {
def main(args: Array[String]): Unit = {
println("Spark Structured Streaming with Kafka Demo Application Started ...")
val KAFKA_BOOTSTRAP_SERVERS_CONS = "localhost:9092"
val spark = SparkSession.builder
.appName("Spark Structured Streaming with Kafka Demo")
// Stream from Kafka
val df = spark.readStream
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS)
.option("subscribe", KAFKA_TOPIC_NAME_CONS)
.option("startingOffsets", "latest")
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test2")
And the error is:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
at streaming_app_demo$.main(teste.scala:29)
at streaming_app_demo.main(teste.scala)
And my is:
name := "scala_212"
version := "0.1"
scalaVersion := "2.12.11"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.5" % "provided"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.5.0"
For spark structured streaming + kafka
, this spark-sql-kafka-0-10 library required.
You are getting this org.apache.spark.sql.AnalysisException: Failed to find data source: kafka
exception because spark-sql-kafka
library is not available in your classpath & It is unable to find org.apache.spark.sql.sources.DataSourceRegister
inside META-INF/services folder.
DataSourceRegister path inside jar file
If you are using SBT, try add below code block. This will include org.apache.spark.sql.sources.DataSourceRegister
file in your final jar.
// META-INF discarding
assemblyMergeStrategy in assembly := {
case PathList("META-INF","services",xs @ _*) => MergeStrategy.filterDistinctLines
case PathList("META-INF",xs @ _*) => MergeStrategy.discard
case "application.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
