Reputation: 3079
Using image version 2.0 seems to work as expected (no error). So this bug is a regression from --version 2.0
to --version 2.1
I submit a Dataproc Serverless PubSubLite related job with the following command line:
gcloud dataproc batches submit pyspark \
./main.py \
--region europe-west1 \
--deps-bucket gs://... \
--jars gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-1.0.0-with-dependencies.jar \
--version 2.1
I get the following error:
23/10/04 14:02:23 ERROR MicroBatchExecution: Query [id = e1d920ae-c3fd-47ab-8139-474080e76a16, runId = 40153dad-ce4f-4d2a-ae7f-b860ab145ab2] terminated with error
java.lang.IncompatibleClassChangeError: Class com.google.longrunning.GetOperationRequest does not implement the requested interface repackaged.com.google.protobuf.MessageLite
at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.<init>(ProtoLiteUtils.java:128)
at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.lite.ProtoLiteUtils.marshaller(ProtoLiteUtils.java:84)
at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.ProtoUtils.marshaller(ProtoUtils.java:57)
at com.google.longrunning.stub.GrpcOperationsStub.<clinit>(GrpcOperationsStub.java:68)
at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.<init>(GrpcCursorServiceStub.java:130)
at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.<init>(GrpcCursorServiceStub.java:116)
at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.create(GrpcCursorServiceStub.java:96)
at com.google.cloud.pubsublite.v1.stub.CursorServiceStubSettings.createStub(CursorServiceStubSettings.java:201)
at com.google.cloud.pubsublite.v1.CursorServiceClient.<init>(CursorServiceClient.java:160)
at com.google.cloud.pubsublite.v1.CursorServiceClient.create(CursorServiceClient.java:142)
at com.google.cloud.pubsublite.spark.PslReadDataSourceOptions.newCursorServiceClient(PslReadDataSourceOptions.java:155)
at com.google.cloud.pubsublite.spark.PslReadDataSourceOptions.newCursorClient(PslReadDataSourceOptions.java:169)
at com.google.cloud.pubsublite.spark.PslMicroBatchStream.<init>(PslMicroBatchStream.java:59)
at com.google.cloud.pubsublite.spark.PslScanBuilder.toMicroBatchStream(PslScanBuilder.java:110)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$4(MicroBatchExecution.scala:107)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:454)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:100)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:456)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:84)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:64)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:298)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Exception in thread "stream execution thread for [id = e1d920ae-c3fd-47ab-8139-474080e76a16, runId = 40153dad-ce4f-4d2a-ae7f-b860ab145ab2]" java.lang.IncompatibleClassChangeError: Class com.google.longrunning.GetOperationRequest does not implement the requested interface repackaged.com.google.protobuf.MessageLite
at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.<init>(ProtoLiteUtils.java:128)
at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.lite.ProtoLiteUtils.marshaller(ProtoLiteUtils.java:84)
at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.ProtoUtils.marshaller(ProtoUtils.java:57)
at com.google.longrunning.stub.GrpcOperationsStub.<clinit>(GrpcOperationsStub.java:68)
at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.<init>(GrpcCursorServiceStub.java:130)
at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.<init>(GrpcCursorServiceStub.java:116)
at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.create(GrpcCursorServiceStub.java:96)
at com.google.cloud.pubsublite.v1.stub.CursorServiceStubSettings.createStub(CursorServiceStubSettings.java:201)
at com.google.cloud.pubsublite.v1.CursorServiceClient.<init>(CursorServiceClient.java:160)
at com.google.cloud.pubsublite.v1.CursorServiceClient.create(CursorServiceClient.java:142)
at com.google.cloud.pubsublite.spark.PslReadDataSourceOptions.newCursorServiceClient(PslReadDataSourceOptions.java:155)
at com.google.cloud.pubsublite.spark.PslReadDataSourceOptions.newCursorClient(PslReadDataSourceOptions.java:169)
at com.google.cloud.pubsublite.spark.PslMicroBatchStream.<init>(PslMicroBatchStream.java:59)
at com.google.cloud.pubsublite.spark.PslScanBuilder.toMicroBatchStream(PslScanBuilder.java:110)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$4(MicroBatchExecution.scala:107)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:454)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:100)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:456)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:84)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:64)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:298)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
How to fix it ?
From what I understand, this seems to be a java packaging related error.
com.google.longrunning.stub.GrpcOperationsStub
constructor seems to expect an object implementing repackaged.com.google.protobuf.MessageLite
.repackaged
packagecom.google.longrunning.GetOperationRequest
seems to not implement the right interface (I suspect it to implement com.google.protobuf.MessageLite
(the un-shaded class)).However, all these stuffs seems to come from the provided gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-1.0.0-with-dependencies.jar
that I don't manage.
Here is my Spark code:
from pyspark import sql as psql
spark = (
psql.SparkSession.Builder()
.getOrCreate()
)
input_ = (
spark.readStream.format("pubsublite")
.option(
"pubsublite.subscription",
"...",
)
.load()
)
query = input_.writeStream.trigger(processingTime="2 seconds").format("console").start()
query.awaitTermination(10)
Kind
Upvotes: 2
Views: 137