PRONKERIJ
PRONKERIJ

Reputation: 95

ERROR MicroBatchExecution - PySpark: Writing dataframe to Elasticsearch

I'm trying to write a stream to Elasticsearch using PySpark. I have two dataframes that I read from Kafka and join into df_joined. Printing df_joined to the terminal shows the correct columns and values. Once I try to write it to Elasticsearch (on localhost:9200) using this code:

spark_session = SparkSession.builder.appName("spark-test").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.elasticsearch:elasticsearch-hadoop:8.5.3").getOrCreate()
df1 = ... 
df2 = ... # For df1 and df2 I do .select(from_json(col('value), schema)) 
df_joined = df1.join(df2, df1.fk == d2.pk)
query = df_joined \
    .writeStream \
    .outputMode("append") \
    .format("org.elasticsearch.spark.sql") \
    .option("es.resource", "name_of_index/name_of_type") \
    .option("es.mapping.id", "id") \
    .option("es.spark.sql.streaming.sink.log.enabled", False) \
    .option("checkpointLocation", "/tmp/es_checkpoint") \
    .start()
query.awaitTermination()

I use:

I get the following error/stack trace:

22/12/28 00:41:48 ERROR MicroBatchExecution: Query [id = 3b9b239f-bd38-43e0-820a-6d8fdac56e79, runId = e4869668-2e3c-4e8e-a6be-6c111a832812] terminated with error
java.lang.NoSuchMethodError: org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(Lorg/apache/spark/sql/SparkSession;Lorg/apache/spark/sql/execution/QueryExecution;Lscala/Function0;)Ljava/lang/Object;
    at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.addBatch(EsSparkSqlStreamingSink.scala:62)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:666)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Exception in thread "stream execution thread for [id = 3b9b239f-bd38-43e0-820a-6d8fdac56e79, runId = e4869668-2e3c-4e8e-a6be-6c111a832812]" java.lang.NoSuchMethodError: org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(Lorg/apache/spark/sql/SparkSession;Lorg/apache/spark/sql/execution/QueryExecution;Lscala/Function0;)Ljava/lang/Object;
    at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.addBatch(EsSparkSqlStreamingSink.scala:62)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:666)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)

What I have tried

edit:

Upvotes: 1

Views: 439

Answers (1)

Koedlt
Koedlt

Reputation: 5973

If you look at the most interesting piece of your error, you see the following:

java.lang.NoSuchMethodError: org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(Lorg/apache/spark/sql/SparkSession;Lorg/apache/spark/sql/execution/QueryExecution;Lscala/Function0;)

So it's not finding a method of SQLExecution called withNewExecutionId that has a SparkSession, a QueryExecution and a Function as input. Let's have a look at that method in Spark's source code.

In version 3.3.1, we see the following function signature:

def withNewExecutionId[T](
      queryExecution: QueryExecution,
      name: Option[String] = None)(body: => T): T

This explains why you're getting a NoSuchMethodError: there is no method with the function signature you're expecting!

Now, let's look at the dependencies of your artifact in Maven Repository. For Spark, we're seeing this:

enter image description here

with the left column being the dependency version and the right column the newest stable update to that dependency. Let's have a look at the dependency version, 2.3.0.

In version 2.3.0, we see the following function signature:

def withNewExecutionId[T](
      sparkSession: SparkSession,
      queryExecution: QueryExecution)(body: => T): T

That looks like what we're expecting! A SparkSession, a QueryExecution and a Function as an input.

Solution: It seems like this artifact requires you to use Spark 2.3.0 and is not compatible with newer versions the way you're using it right now. I don't know this artifact per se, so maybe there are ways around this. But try it out with version 2.3.0 and see what happens :)

Upvotes: 2

Related Questions