dejan
dejan

Reputation: 196

Flink write to S3 with Presto

I would like to write to S3 from Flink 1.4.2 using presto interface and BucketingSink. I followed the instructions, added in flink-conf.yaml s3.access-key and s3.secret-key and put flink-s3-fs-presto-1.4.2.jar in lib folder. Below is error that is produced.

If job is executed in AWS environment I hope that I don't need to set up keys at all. I is this assumption correct.

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively).
at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:93)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy17.initialize(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:91)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1206)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

Upvotes: 2

Views: 2095

Answers (1)

Stephan Ewen
Stephan Ewen

Reputation: 2371

The application seems not to be using the flink-s3-fs-presto at all, but Hadoop's deprecated old S3 File System. The stack trace you pasted indicates that the flink-s3-fs-presto is not picked up for the file system scheme 's3://'.

Please make sure that the flink-s3-fs-presto JAR file is really in the lib folder of the TaskManagers that execute the job, not only on the client.

  • When you use YARN or Mesos to deploy Flink jobs, that should automatically happen.
  • When you deploy Flink via containers, make sure that the JAR file is in the lib folder of your container image.
  • When you run Flink TaskManagers standalone or manually, make sure all TaskManagers in the cluster have the JAR file in the lob folder before being started.

Upvotes: 1

Related Questions