Neha Somani
Neha Somani

Reputation: 1

PulsarIO.read() failing with AutoValue_PulsarSourceDescriptor not found

What happened?

I have configured PulsarIO plugin via Beam to read messages from Pulsar as below:

PCollection<PulsarMessage> pCollectionAll = p.apply("ReadPulsarMessage", PulsarIO
                .read()
                .withAdminUrl(options.getPulsarAdminURL())
                .withClientUrl(options.getPulsarClientURL())
                .withTopic(options.getPulsarTopic()));

I can see PulsarSourceDescriptor has 3 mandatory things so I set those up. But I am not able to read messages & getting below error:


    
Error message from worker: java.io.IOException: Failed to start reading from source: org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@53cad662 org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:821) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056) org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:750) Caused by: java.lang.RuntimeException: Could not find a way to create AutoValue class class com.idfy.beam.pulsar.PulsarSourceDescriptor org.apache.beam.sdk.schemas.AutoValueSchema.schemaTypeCreator(AutoValueSchema.java:133) org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56) org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92) org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:63) org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:43) org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:126) org.apache.beam.sdk.coders.Coder.decode(Coder.java:154) org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:142) org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:102) org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:96) org.apache.beam.sdk.transforms.Create$Values$BytesReader.advanceImpl(Create.java:560) org.apache.beam.sdk.transforms.Create$Values$BytesReader.startImpl(Create.java:542) org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252) org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:474) org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:452) org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.advance(UnboundedReadFromBoundedSource.java:304) org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.start(UnboundedReadFromBoundedSource.java:297) org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:816) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056) org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:750)

    

We were using rabbitMQ earlier, for better scale turning to Pulsar. I don't see expand method in PulsarSourceDescriptor & also can't find anything related to UnboundedReadFromBoundedSource in PulsarIO.

I am able to generate AutoValue_PulsarSourceDescriptor in my local build but getting above error while deployed on dataflow.

Upvotes: 0

Views: 24

Answers (0)

Related Questions