Reputation: 357
DirectRunner gets the job done without issue, but Dataflow consistently fails because it is unable to delete, and then unable to rename temporary files.
This is the exact same code, exact same bucket, I only changed the runners:
public static DataflowPipelineOptions setOptions() {
List<String> experiments = new ArrayList<String>(); // experiments must be a MUTABLE list
experiments.add("use_runner_v2");
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("{PROJECT ID}");
options.setJobName("takt-dataflow-test");
options.setTempLocation(bucketPath + "temp/");
options.setStagingLocation(bucketpath + "staging/");
options.setRegion(region);
options.setServiceAccount(serviceAccountEmail);
options.setExperiments(experiments);
options.setSubnetwork(subNetwork);
return options;
}
public static void main(String[] args) {
String bucketPath = "gs://mybucketpath/";
DataflowPipelineOptions options = setOptions();
Pipeline p = Pipeline.create(options);
String inputPath = bucketPath + "input/test.csv";
String outputPath = bucketPath + "output/";
PCollection<String> lines = p.apply("ReadMyFile", TextIO.read().from(inputPath));
lines.apply("WriteMyFile", TextIO.write().to(outputPath + "test.txt"));
p.run().waitUntilFinish();
}
The undeclared variables here are me anonymizing things that are actually strings. I just write bucketpath straight out in setOptions, which is why I don't bother to pass it.
The difference between DirectRunner and DataflowRunner is I comment out the setRunner line. DirectRunner works perfect, Dataflow fails because it can't delete temp files from the bucket.
Thanks!
Upvotes: 1
Views: 1729
Reputation: 521
It seems to be a bucket object permission error. Check the Service Account and verify that you are using the right permissions.
If your bucket is on the same project as your Dataflow Job, you will need to grant the following permissions to your service account:
storage.objects.create
storage.objects.delete
storage.objects.get
storage.objects.getIamPolicy
storage.objects.list
Otherwise you will need to grant the object access permissions to your service account on the bucket project. This process is detailed here.
In order to grant the permissions, you can define the permissions as a role and assign the role to the service account:
gcloud iam roles create objectEditor \
--project={PROJECT_ID} \
--permissions=storage.objects.create,storage.objects.delete,storage.objects.get,storage.objects.getIamPolicy,storage.objects.list
gcloud projects add-iam-policy-binding {PROJECT_ID} \
--member=serviceAccount:{SERVICE_ACC_EMAIL} \
--role=objectEditor
Upvotes: 2
Reputation: 310
I faced the same issue. Then I updated Service Account permissions, try it may work for you.
storage.objects.create
storage.objects.delete
storage.objects.get
storage.objects.getIamPolicy
storage.objects.list
Upvotes: 1