chintootech
chintootech

Reputation: 1

Azure Blob Storage with Apache Flink 1.10

I am trying to use Azure Blob Storage with Apache Flink 1.10 for checkpointing purposes.

I followed all the instructions mentioned in [Flink Documentation][1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/azure.html

Step 1:

mkdir ./plugins/azure-fs-hadoop cp ./opt/flink-azure-fs-hadoop-1.10.0.jar ./plugins/azure-fs-hadoop/

Step 2:

This is what I have in flink-conf.xml

#Azure Storage Key **fs.azure.account.key.<storage-account>.blob.core.windows.net:xxxxxxxxxxxxxxxxxx**

Step 3:

Use Azure Blob storage for checkpointing

This is what I have in my flink job

final StateBackend stateBackend = new FsStateBackend("wasb://flink-blob@$<storage-account>.blob.core.windows.net/checkpoint");

I am not sure if i missed anything here but when I submit the job, I get below Exception (AskTimeoutException from Actor).

    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
    at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1741)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
    at com.example.flink.checkpointing.CheckpointExample.main(CheckpointExample.java:78)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
    ... 8 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1736)
    ... 17 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
    at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
    at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#75666936]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
    at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
    at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
    at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
    at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
    at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
    at java.base/java.lang.Thread.run(Thread.java:834)

End of exception on server side>]
    at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
    ... 4 more```

Upvotes: 0

Views: 1034

Answers (2)

Amit Shahi
Amit Shahi

Reputation: 101

That's correct. You must set your checkpoints.dir and savepoints.dir in the following format and use

fs.azure.account.key.<storage-account-name>.blob.core.windows.net:<key>

wasbs://<container>@<storage-account-name>.blob.core.windows.net/<directory>/

Upvotes: 0

Till Rohrmann
Till Rohrmann

Reputation: 13346

I think your problem is twofold. The true failure cause is hidden because of the AskTimeoutException. This problem has been solved with FLINK-16018 which will be released with Flink 1.10.1. The problem is that the timeout value is too aggressive so that a long lasting job submission will fail on the client side.

For the true failure cause, I would recommend to take a look at Flink's jobmanager.log. It should contain information about what went wrong. I would suspect that there is a misconfiguration of the Azure Blob Storage.

Upvotes: 1

Related Questions