luca
luca

Reputation: 65

I have an error in dataflow: Error processing pipeline

I have an error in dataflow: Error processing pipeline

when i run this code with DirectRunner works but when i run this code with DataFlowRunner i have this error:

this the error:

ERROR:apache_beam.runners.dataflow.dataflow_runner:Console URL: https://console.cloud.google.com/dataflow/jobs/<RegionId>/2023-04-05_02_00_47-7238238223888513941?project=<ProjectId>
Traceback (most recent call last):
File "test.py", line 51, in <module>
run()
File "test.py", line 44, in run
output | 'Write' >> WriteToText("gs://<bucket>/output/wc.txt")
File "/opt/py38/lib64/python3.8/site-packages/apache_beam/pipeline.py", line 601, in __exit__
self.result.wait_until_finish()
File "/opt/py38/lib64/python3.8/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1555, in wait_until_finish raise DataflowRuntimeException( apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: 
Error processing pipeline.

in iam i have this rule:

the code is:

import os

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def pipelineOptions(pipeline_args):
    pipeline_options = PipelineOptions(
        pipeline_args,
        runner="DirectRunner",
        project=<project-name>,
        job_name="testbigquery",
        temp_location=<temp-location>,
        region=<region>
        )
    return pipeline_options


def run(argv=None):
    print("Start Process")
    pipeline_options = pipelineOptions(argv)
    pipeline = beam.Pipeline(options=pipeline_options)
    with pipeline as p:
        lines = p
        counts = (
                lines
                | 'Split' >> (beam.Create(["test", "fix", "test"]))
                | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                | 'GroupAndSum' >> beam.CombinePerKey(sum))
        def format_result(word, count):
            return '%s: %d' % (word, count)
        output = counts | 'Format' >> beam.MapTuple(format_result)
        output | 'Write' >> WriteToText("gs://<bucket>/output/wc.txt")

    print("End Process")


if __name__ == '__main__':
    run()

Upvotes: 0

Views: 733

Answers (1)

Poala Astrid
Poala Astrid

Reputation: 842

As mentioned by @luca, who is also the OP, the error has been resolved by adding --service_account_email in the parameter with the correct service account when using DataFlowRunner.

Posting this answer as a Community Wiki since this has been answered in the comments but not posted as an answer. This will be for the benefit of the community that might encounter this use case in the future.

Please feel free to edit this answer for additional information and if there are other possible workarounds/direct solutions for this use case.

Upvotes: 0

Related Questions