Gayan Weerakutti
Gayan Weerakutti

Reputation: 13831

Authenticating with Google Cloud from Apache Beam application via code

I'm trying to run an Apache Beam application in Kinesis Data Analytics which uses Apache Flink as the runtime. The pipeline uses the PubsubIO connector. I'm trying to authenticate with Google Cloud using code, since Kinesis Data Analytics does not allow to export environment variables, exporting GOOGLE_APPLICATION_CREDENTIALS environment variable doesn't seem to be an option.

I'm trying to authenticate using code as below.

    GoogleCredentials credential = GoogleCredentials
            .fromStream(credentialJsonInputStream)
            .createScoped("https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/pubsub");
    credential.refreshIfExpired();

    options.setGcpCredential(credential);

The options reference here inherits PubsubOptions.

But when running the application it fails with the exception:

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden POST https://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish { "code" : 403, "errors" : [ { "domain" : "global", "message" : "The request is missing a valid API key.", "reason" : "forbidden" } ], "message" : "The request is missing a valid API key.", "status" : "PERMISSION_DENIED" } at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) at com.amazonaws.kinesisanalytics.beam.BasicBeamStreamingJob.main(BasicBeamStreamingJob.java:67)

While debugging I noticed that the PubsubOptions reference passed to the org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.PubsubJsonClientFactory#newClient returns null when calling GcpOptions#getGcpCredential

I'd really appreciate any insights on how to authenticate in this scenario.

Upvotes: 1

Views: 1034

Answers (1)

Gayan Weerakutti
Gayan Weerakutti

Reputation: 13831

GcpOptions#setGcpCredential option can’t be used with Flink runner, because the Flink runner serializes PipelineOptions, but the getGcpCredential is annotated with @JsonIgnore.

When no credential has been set explicitly via GcpOptions#setGcpCredential, GCP services such as Pub/Sub uses a credential based upon the currently set GcpOptions#credentialFactoryClass.

So instead of calling options.setGcpCredential(credential), we can define a custom GcpCredentialFactory class. Then pass it to GcpOptions#credentialFactoryClass

options.setCredentialFactoryClass(CustomGcpCredentialFactory.class);

Your application's PipelineOptions interface would need to extend the GcpOptions interface, for you to be able to call the above method on your options reference.

public class CustomCredentialFactory extends GcpCredentialFactory {

  private static CustomCredentialFactory INSTANCE = new CustomCredentialFactory();

  private CustomCredentialFactory(PipelineOptions o) {  }

  /**
   * Required by GcpOptions.GcpUserCredentialsFactory#create(org.apache.beam.sdk.options.PipelineOptions)
   */
  public static CustomCredentialFactory fromOptions(PipelineOptions o) {

      return new CustomCredentialFactory(o);
   }

  @Override
  public Credentials getCredential() {

      try {

          // Load the GCP credential file (from S3, Jar, ..)
          InputStream credentialFileInputStream = SomeUtil.getCredentialInputStream();

          return GoogleCredentials
                  .fromStream(credentialFileInputStream)
                  .createScoped("https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/pubsub");

      } catch (IOException e) {
          return null;
      }
  }
}

Upvotes: 2

Related Questions