Reputation: 35
query = info_df_fin.selectExpr("to_json(struct(*)) AS value")\
.writeStream\
.format("kafka")\
.option("checkpointLocation", "chkpint_directory")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("topic", "Dest_topic")\
.start()
This is the code that I am using to send stream batch data from on kafka topic to another but I am facing an error despite of having all jar file in the location. including "spark-streaming-kafka-0-10_2.12-2.4.0.jar"
I'm having this error
23/09/18 11:15:58 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. 23/09/18 11:15:58 ERROR MicroBatchExecution: Query [id = 23d94359-dabf-41ef-8617-a62859a35adb, runId = 57e0c21c-5245-40c5-9878-be1b5b06e106] terminated with error java.lang.NoClassDefFoundError: org/apache/spark/sql/internal/connector/SupportsStreamingUpdate at java.base/java.lang.ClassLoader.defineClass1(Native Method) at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022) at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174) at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:800) at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:698) at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:621) at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:579) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527) at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable.newWriteBuilder(KafkaSourceProvider.scala:397) at org.apache.spark.sql.execution.datasources.v2.V2Writes$.org$apache$spark$sql$execution$datasources$v2$V2Writes$$newWriteBuilder(V2Writes.scala:144) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:90) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
I am trying to send the streamed data from one kafka topic to another
Upvotes: 0
Views: 431
Reputation: 627
You are most likely using a different version of spark.
If your kafka jar file is spark-streaming-kafka-0-10_2.12-2.4.0.jar, you should be having spark 2.4 with Scala 2.12
Alternatively, use one of the latest stable versions of Spark like 3.3 and update the jar file name to spark-streaming-kafka-0-10_2.12-3.3.0.jar
Upvotes: 0