Reputation: 13831
I'm trying to run an Apache Beam application in Kinesis Data Analytics which uses Apache Flink as the runtime. The pipeline uses the PubsubIO connector. I'm trying to authenticate with Google Cloud using code, since Kinesis Data Analytics does not allow to export environment variables, exporting GOOGLE_APPLICATION_CREDENTIALS environment variable doesn't seem to be an option.
I'm trying to authenticate using code as below.
GoogleCredentials credential = GoogleCredentials
.fromStream(credentialJsonInputStream)
.createScoped("https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/pubsub");
credential.refreshIfExpired();
options.setGcpCredential(credential);
The options reference here inherits PubsubOptions.
But when running the application it fails with the exception:
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden POST https://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish { "code" : 403, "errors" : [ { "domain" : "global", "message" : "The request is missing a valid API key.", "reason" : "forbidden" } ], "message" : "The request is missing a valid API key.", "status" : "PERMISSION_DENIED" } at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) at com.amazonaws.kinesisanalytics.beam.BasicBeamStreamingJob.main(BasicBeamStreamingJob.java:67)
While debugging I noticed that the PubsubOptions
reference passed to the org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.PubsubJsonClientFactory#newClient returns null
when calling GcpOptions#getGcpCredential
I'd really appreciate any insights on how to authenticate in this scenario.
Upvotes: 1
Views: 1034
Reputation: 13831
GcpOptions#setGcpCredential option can’t be used with Flink runner, because the Flink runner serializes PipelineOptions, but the getGcpCredential is annotated with @JsonIgnore.
When no credential has been set explicitly via GcpOptions#setGcpCredential, GCP services such as Pub/Sub uses a credential based upon the currently set GcpOptions#credentialFactoryClass.
So instead of calling options.setGcpCredential(credential)
, we can define a custom GcpCredentialFactory
class. Then pass it to GcpOptions#credentialFactoryClass
options.setCredentialFactoryClass(CustomGcpCredentialFactory.class);
Your application's PipelineOptions
interface would need to extend the GcpOptions
interface, for you to be able to call the above method on your options
reference.
public class CustomCredentialFactory extends GcpCredentialFactory {
private static CustomCredentialFactory INSTANCE = new CustomCredentialFactory();
private CustomCredentialFactory(PipelineOptions o) { }
/**
* Required by GcpOptions.GcpUserCredentialsFactory#create(org.apache.beam.sdk.options.PipelineOptions)
*/
public static CustomCredentialFactory fromOptions(PipelineOptions o) {
return new CustomCredentialFactory(o);
}
@Override
public Credentials getCredential() {
try {
// Load the GCP credential file (from S3, Jar, ..)
InputStream credentialFileInputStream = SomeUtil.getCredentialInputStream();
return GoogleCredentials
.fromStream(credentialFileInputStream)
.createScoped("https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/pubsub");
} catch (IOException e) {
return null;
}
}
}
Upvotes: 2