AlexisBRENON
AlexisBRENON

Reputation: 3079

How to use PubSubLite Spark connector in custom pyspark container for Dataproc serverless

Edit

Using image version 2.0 seems to work as expected (no error). So this bug is a regression from --version 2.0 to --version 2.1


Initial message

TL;DR

I submit a Dataproc Serverless PubSubLite related job with the following command line:

gcloud dataproc batches submit pyspark \
  ./main.py \
  --region europe-west1 \
  --deps-bucket gs://... \
  --jars gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-1.0.0-with-dependencies.jar \
  --version 2.1

I get the following error:

23/10/04 14:02:23 ERROR MicroBatchExecution: Query [id = e1d920ae-c3fd-47ab-8139-474080e76a16, runId = 40153dad-ce4f-4d2a-ae7f-b860ab145ab2] terminated with error
java.lang.IncompatibleClassChangeError: Class com.google.longrunning.GetOperationRequest does not implement the requested interface repackaged.com.google.protobuf.MessageLite
        at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.<init>(ProtoLiteUtils.java:128)
        at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.lite.ProtoLiteUtils.marshaller(ProtoLiteUtils.java:84)
        at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.ProtoUtils.marshaller(ProtoUtils.java:57)
        at com.google.longrunning.stub.GrpcOperationsStub.<clinit>(GrpcOperationsStub.java:68)
        at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.<init>(GrpcCursorServiceStub.java:130)
        at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.<init>(GrpcCursorServiceStub.java:116)
        at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.create(GrpcCursorServiceStub.java:96)
        at com.google.cloud.pubsublite.v1.stub.CursorServiceStubSettings.createStub(CursorServiceStubSettings.java:201)
        at com.google.cloud.pubsublite.v1.CursorServiceClient.<init>(CursorServiceClient.java:160)
        at com.google.cloud.pubsublite.v1.CursorServiceClient.create(CursorServiceClient.java:142)
        at com.google.cloud.pubsublite.spark.PslReadDataSourceOptions.newCursorServiceClient(PslReadDataSourceOptions.java:155)
        at com.google.cloud.pubsublite.spark.PslReadDataSourceOptions.newCursorClient(PslReadDataSourceOptions.java:169)
        at com.google.cloud.pubsublite.spark.PslMicroBatchStream.<init>(PslMicroBatchStream.java:59)
        at com.google.cloud.pubsublite.spark.PslScanBuilder.toMicroBatchStream(PslScanBuilder.java:110)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$4(MicroBatchExecution.scala:107)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:454)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:100)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:84)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:456)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:84)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:64)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:298)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Exception in thread "stream execution thread for [id = e1d920ae-c3fd-47ab-8139-474080e76a16, runId = 40153dad-ce4f-4d2a-ae7f-b860ab145ab2]" java.lang.IncompatibleClassChangeError: Class com.google.longrunning.GetOperationRequest does not implement the requested interface repackaged.com.google.protobuf.MessageLite
        at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.<init>(ProtoLiteUtils.java:128)
        at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.lite.ProtoLiteUtils.marshaller(ProtoLiteUtils.java:84)
        at com.google.cloud.pubsublite.repackaged.io.grpc.protobuf.ProtoUtils.marshaller(ProtoUtils.java:57)
        at com.google.longrunning.stub.GrpcOperationsStub.<clinit>(GrpcOperationsStub.java:68)
        at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.<init>(GrpcCursorServiceStub.java:130)
        at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.<init>(GrpcCursorServiceStub.java:116)
        at com.google.cloud.pubsublite.v1.stub.GrpcCursorServiceStub.create(GrpcCursorServiceStub.java:96)
        at com.google.cloud.pubsublite.v1.stub.CursorServiceStubSettings.createStub(CursorServiceStubSettings.java:201)
        at com.google.cloud.pubsublite.v1.CursorServiceClient.<init>(CursorServiceClient.java:160)
        at com.google.cloud.pubsublite.v1.CursorServiceClient.create(CursorServiceClient.java:142)
        at com.google.cloud.pubsublite.spark.PslReadDataSourceOptions.newCursorServiceClient(PslReadDataSourceOptions.java:155)
        at com.google.cloud.pubsublite.spark.PslReadDataSourceOptions.newCursorClient(PslReadDataSourceOptions.java:169)
        at com.google.cloud.pubsublite.spark.PslMicroBatchStream.<init>(PslMicroBatchStream.java:59)
        at com.google.cloud.pubsublite.spark.PslScanBuilder.toMicroBatchStream(PslScanBuilder.java:110)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$4(MicroBatchExecution.scala:107)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:454)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:100)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:84)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:456)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:84)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:64)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:298)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)

How to fix it ?


From what I understand, this seems to be a java packaging related error.

However, all these stuffs seems to come from the provided gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-1.0.0-with-dependencies.jar that I don't manage.

Here is my Spark code:

from pyspark import sql as psql


spark = (
        psql.SparkSession.Builder()
        .getOrCreate()
    )

input_ = (
    spark.readStream.format("pubsublite")
        .option(
            "pubsublite.subscription",
            "...",
        )
        .load()
)

query = input_.writeStream.trigger(processingTime="2 seconds").format("console").start()

query.awaitTermination(10)

Kind

Upvotes: 2

Views: 137

Answers (0)

Related Questions