Reputation: 1
When using Apache Flink SQL connector for Pulsar (link), I consistently get the Avro deserialization problem when trying to read from a Flink table that is backed by a Pulsar topic (having an Avro schema)
The detailed error stack from Flink SQL log file is as below:
2023-07-26 09:50:25,521 INFO org.apache.flink.client.program.rest.RestClusterClient [] - Submitting job 'collect' (4193eea221aba3b842e24ba369d754b8).
2023-07-26 09:50:26,696 INFO org.apache.flink.client.program.rest.RestClusterClient [] - Successfully submitted job 'collect' (4193eea221aba3b842e24ba369d754b8) to 'http://localhost:8081'.
2023-07-26 09:54:32,939 INFO org.apache.flink.table.client.gateway.local.LocalExecutor [] - Cancelling job 4193eea221aba3b842e24ba369d754b8 and result retrieval.
2023-07-26 09:54:32,959 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Error while retrieving result.
at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79) ~[flink-sql-client-1.16.2.jar:1.16.2]
Caused by: java.lang.RuntimeException: Failed to fetch next result
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?]
at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.2.jar:1.16.2]
Caused by: java.io.IOException: Failed to fetch job execution result
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?]
at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.2.jar:1.16.2]
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 4193eea221aba3b842e24ba369d754b8)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) ~[?:?]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?]
at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.2.jar:1.16.2]
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 4193eea221aba3b842e24ba369d754b8)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130) ~[flink-dist-1.16.2.jar:1.16.2]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302) ~[flink-dist-1.16.2.jar:1.16.2]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772) ~[flink-dist-1.16.2.jar:1.16.2]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302) ~[flink-dist-1.16.2.jar:1.16.2]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128) ~[flink-dist-1.16.2.jar:1.16.2]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302) ~[flink-dist-1.16.2.jar:1.16.2]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772) ~[flink-dist-1.16.2.jar:1.16.2]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302) ~[flink-dist-1.16.2.jar:1.16.2]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477) ~[flink-dist-1.16.2.jar:1.16.2]
at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[?:?]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[?:?]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.16.2.jar:1.16.2]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.16.2.jar:1.16.2]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.16.2.jar:1.16.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-scala_2.12-1.16.2.jar:1.16.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-scala_2.12-1.16.2.jar:1.16.2]
at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[?:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[?:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[?:?]
at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) ~[?:?]
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[?:?]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[?:?]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[?:?]
Caused by: java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchema.deserialize(PulsarTableDeserializationSchema.java:105) ~[flink-sql-connector-pulsar-1.16.0.0.jar:1.16.0.0]
at org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:54) ~[flink-sql-connector-pulsar-1.16.0.0.jar:1.16.0.0]
at org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:36) ~[flink-sql-connector-pulsar-1.16.0.0.jar:1.16.0.0]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) ~[flink-connector-files-1.16.2.jar:1.16.2]
at org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:109) ~[flink-sql-connector-pulsar-1.16.0.0.jar:1.16.0.0]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:545) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:785) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.2.jar:1.16.2]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -62 out of bounds for length 2
at org.apache.flink.avro.shaded.org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) ~[flink-sql-avro-1.16.2.jar:1.16.2]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchema.deserialize(PulsarTableDeserializationSchema.java:105) ~[flink-sql-connector-pulsar-1.16.0.0.jar:1.16.0.0]
at org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:54) ~[flink-sql-connector-pulsar-1.16.0.0.jar:1.16.0.0]
at org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:36) ~[flink-sql-connector-pulsar-1.16.0.0.jar:1.16.0.0]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) ~[flink-connector-files-1.16.2.jar:1.16.2]
at org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:109) ~[flink-sql-connector-pulsar-1.16.0.0.jar:1.16.0.0]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:545) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:785) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.2.jar:1.16.2]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
I’m trying to using Flink SQL client to test out the Flink Pulsar integration; and the underlying Pulsar topic has a very simple Avro schema (more on this later). Here is what I did:
I double checked the pulsar topic schema and confirms it is Avro, see below:
localhost(localhost)> admin schemas get public/default/tp_simavr_2
{
"version": 0,
"schemaInfo": {
"name": "tp_simavr_2",
"schema": {
"type": "record",
"name": "Person",
"fields": [
{
"name": "fname",
"type": "string",
"nullable": false
},
{
"name": "lname",
"type": "string",
"nullable": false
}
]
},
"type": "AVRO",
"timestamp": 1690382920466,
"properties": {}
}
}
I double checked the Flink table creation statement and it is Avro as well
Flink SQL> show create table tp_simavr_2;
CREATE TABLE `pulsar_cat_local`.`default_database`.`tp_simavr_2` (
`fname` VARCHAR(20),
`lname` VARCHAR(20)
) WITH (
'explicit' = 'true',
'connector' = 'pulsar',
'topics' = 'persistent://public/default/tp_simavr_2',
'format' = 'avro',
'admin-url' = 'http://localhost:8080',
'service-url' = 'pulsar://localhost:6650'
)
When I try to run a simple select statement, I get the following error:
Flink SQL> select * from tp_simavr_2;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ArrayIndexOutOfBoundsException: Index -62 out of bounds for length 2
Purely from Pulsar side, I'm able to produce and consume the Avro messages without problem. But when it comes to the Flink side, I just can't make it work. Any idea where could be the problem?
Upvotes: 0
Views: 232