Reputation: 11
We are currently developing an Apache Beam pipeline to read data from GCP Pub/Sub and to write the data received to a bucket in AWS S3.
We are using TextIO.write
in beam-sdks-java-io.amazon-web-services
to write to S3.
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards)
.withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(options.getTempLocation))
.to(FileBasedSink.convertToFileResourceIfPossible(options.getOutputDirectory))
So first we tested this pipeline locally using the DirectRunner
and that worked fine. (Data incoming from Pub/Sub was received by the pipeline and written to S3.
options.setRunner(classOf[DirectRunner])
options.setStagingLocation("./outputFolder/staging")
options.setTempLocation("s3://my-s3-bucket/temp")
options.setOutputDirectory("s3://my-s3-bucket/output")
In the last part we wanted to run this pipeline using Dataflow runner without any code change, so we modified the code to use the DataflowRunner
options.setRunner(classOf[DataflowRunner])
options.setStagingLocation("gs://my-gcs-bucket/binaries")
options.setGcpTempLocation("gs://my-gcs-bucket/temp")
options.setTempLocation("s3://my-s3-bucket/temp")
options.setOutputDirectory("s3://my-s3-bucket/output")
With this setting, data is received by the pipeline from pub/sub but it does not write to S3. There are also no errors written to the Dataflow logs in StackDriver.
Does anyone know what the issue could be? Is the pipeline options config incorrect? Or is the write to S3 failing silently?
Does anyone have suggestion on how to config the logs in beam-sdks-java-io.amazon-web-services
to output the DEBUG level logging?
Thanks!
Upvotes: 1
Views: 1898
Reputation: 3883
To execute your pipeline using the DataflowRunner
, you must set the following fields in PipelineOptions:
- project - The ID of your Google Cloud project
- runner - The pipeline runner that will parse your program and construct your pipeline.
- gcpTempLocation - A Cloud Storage path for Dataflow to stage any temporary files. You must create this bucket ahead of time, before running your pipeline.
- stagingLocation - A Cloud Storage bucket for Dataflow to stage your binary files. If you do not set this option, what you specified for the tempLocation will be used for the staging location as well.
You need to specify project
option, e.g. options.setProject("my-project-id");
.
One important thing, if you use the Apache Beam SDK for Java 2.15.0
or later, you must also specify region option. Please, refer to the official documentation for more information.
I hope it helps.
Upvotes: 1