jhan
jhan

Reputation: 11

Dataflow Flex Template validation failing with no reason given

I have been writing a dataflow pipeline and am using flex templates.

My code reads from avro and processes it no problem. But when it comes to either WriteToAvro or WriteToText, the dataflow job fails and it looks like it fails at template validation. I get no reason for this at all.

I have tried a bunch of things. Removing the parameter for the output file and hard-coding it in. Switching out WriteToAvro for WriteToText but it fails just the same.

    with beam.Pipeline(options=options) as p:
        read_from_avro = p \
                         | 'ReadFromAvro' >> ReadFromAvro(input_file)

        redact_data = read_from_avro | "RedactData" >> IdentifyRedactData(project, redact_fields)

        redact_data | 'WriteToAvro' >> WriteToAvro(
                        file_path_prefix=output_file,
                        schema=s,
                        codec='deflate',
                        file_name_suffix='.avro')

The output of join_pcollections is a pcollection, with each element being a dictionary.

The dataflow logs give this:

2021-06-27 09:04:46.728 BST Workflow failed.

2021-06-27 09:04:46.763 BST Cleaning up.

2021-06-27 09:04:46.817 BST Worker pool stopped.

Does anyone know what's going on. FYI when I remove the last step and run the 'ProcessData' step, it all runs smoothly. It is the last write step that just breaks.

Edit to add requirements file.

apache-beam==2.29.0
google-cloud==0.34.0
google-cloud-dlp==3.1.0
google-cloud-storage==1.35.0
google-cloud-core==1.4.1
google-cloud-datastore==1.15.0

If I try to use apache-beam[gcp]==2.29.0, the build fails, so I wondering if it could be something to do with that.

apache-beam[gcp] 2.29.0 depends on google-cloud-dlp<2 and >=0.12.0; extra == "gcp"

Upvotes: 0

Views: 406

Answers (2)

jhan
jhan

Reputation: 11

Fixed. I think the issue stemmed from the pipeline options not being configured properly. I also changed how the pipeline was run based on the flex wordcount example.

    options = PipelineOptions(beam_args)
    options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=options)

    project = options.get_all_options().get('project')

    read_from_avro = p \
                     | 'ReadFromAvro' >> ReadFromAvro(input_file)

    redact_data = read_from_avro | "RedactData" >> IdentifyRedactData(project, redact_fields)

    redact_data | 'WriteToAvro' >> WriteToAvro(
                    file_path_prefix=output_file,
                    schema=table_schema,
                    codec='deflate')

    result = p.run()
    result.wait_until_finish()

Upvotes: 1

Kenn Knowles
Kenn Knowles

Reputation: 6023

From your job details, you can navigate to Cloud Logging. The default set of logs displayed may not contain the error, so I recommend changing the filters to show all logs.

Upvotes: 0

Related Questions