Reputation: 65
I have an error in dataflow: Error processing pipeline
when i run this code with DirectRunner works but when i run this code with DataFlowRunner i have this error:
this the error:
ERROR:apache_beam.runners.dataflow.dataflow_runner:Console URL: https://console.cloud.google.com/dataflow/jobs/<RegionId>/2023-04-05_02_00_47-7238238223888513941?project=<ProjectId>
Traceback (most recent call last):
File "test.py", line 51, in <module>
run()
File "test.py", line 44, in run
output | 'Write' >> WriteToText("gs://<bucket>/output/wc.txt")
File "/opt/py38/lib64/python3.8/site-packages/apache_beam/pipeline.py", line 601, in __exit__
self.result.wait_until_finish()
File "/opt/py38/lib64/python3.8/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1555, in wait_until_finish raise DataflowRuntimeException( apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Error processing pipeline.
in iam i have this rule:
the code is:
import os
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
def pipelineOptions(pipeline_args):
pipeline_options = PipelineOptions(
pipeline_args,
runner="DirectRunner",
project=<project-name>,
job_name="testbigquery",
temp_location=<temp-location>,
region=<region>
)
return pipeline_options
def run(argv=None):
print("Start Process")
pipeline_options = pipelineOptions(argv)
pipeline = beam.Pipeline(options=pipeline_options)
with pipeline as p:
lines = p
counts = (
lines
| 'Split' >> (beam.Create(["test", "fix", "test"]))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
def format_result(word, count):
return '%s: %d' % (word, count)
output = counts | 'Format' >> beam.MapTuple(format_result)
output | 'Write' >> WriteToText("gs://<bucket>/output/wc.txt")
print("End Process")
if __name__ == '__main__':
run()
Upvotes: 0
Views: 733
Reputation: 842
As mentioned by @luca, who is also the OP, the error has been resolved by adding --service_account_email
in the parameter with the correct service account when using DataFlowRunner.
Posting this answer as a Community Wiki since this has been answered in the comments but not posted as an answer. This will be for the benefit of the community that might encounter this use case in the future.
Please feel free to edit this answer for additional information and if there are other possible workarounds/direct solutions for this use case.
Upvotes: 0