Bernard Wong
Bernard Wong

Reputation: 1

Unable to use native Python3 Library (ElementTree) when using Dataflowrunner

I am currently using Dataflow to handle/massage an XML text string from PubSub. I was able to successfully run a Dataflow job using DirectRunner for my --runner flag. However, I ran into an issue trying create a Dataflow resource using the exact same Dataflow job with DataflowRunner as my flag.

From the error logs (while using DataflowRunner), it seems that the Dataflow template that I've created does not recognize:

import xml.etree.ElementTree as ET

I receive "NameError: name 'ET' is not defined [while running 'generatedPtransform-419']" whenever I reference ET within my pipeline. What is peculiar is that my Dataflow job runs perfectly fine with the DirectRunner which leads me to believe there is an issue with using DataflowRunner to build my template as xml.etree.ElementTree is a simple/native PyPI library.

For my environment, I am working with:
Python 3.7.7
apache-beam 2.22.0

Any help/guidance is much appreciated, thanks!

Working directrunner job:

import apache_beam as beam
import argparse, xmltodict, json
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
import xml.etree.ElementTree as ET

class FormatMessage(beam.DoFn):
    def process(self, line):
        xml_msg = ET.fromstring(line)

        # Code to construct XML Object (removed) 

        tree_new_xml = ET.ElementTree(element_msg)
        xml_dict = xmltodict.parse(ET.tostring(tree_new_xml.getroot(), encoding='utf8'))
        json_obj = str.encode(json.dumps(xml_dict), 'utf8')

        yield json_obj

def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_topic', help='Input topic read data from.', default='')
    parser.add_argument('--output_topic', help='Output topic to write data to.', default='')
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions()
    pipeline_options.view_as(SetupOptions).save_main_session = True

    # Create and implement PubSub-to-PubSub pipeline
    p = beam.Pipeline(options=pipeline_options)
    (p
     | "Read PubSub Message" >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
     | "Format Msg" >> beam.ParDo(FormatMessage())
     | "Write PubSub Output" >> beam.io.WriteToPubSub(known_args.output_topic)
     )
    p.run().wait_until_finish()

Upvotes: 0

Views: 325

Answers (1)

rmesteves
rmesteves

Reputation: 4085

According to this documentation, your problem is due to names that are not available on the Dataflow worker.

Notice that if you have objects in your global namespace that cannot be pickled, you will get a pickling error. If the error is regarding a module that should be available in the Python distribution, you can solve this by importing the module locally, where it is used.

In your case you have to import the mentioned library inside your process function just like below:

class FormatMessage(beam.DoFn):
    def process(self, line):
        import xml.etree.ElementTree as ET
        xml_msg = ET.fromstring(line)

        # Code to construct XML Object (removed) 

        tree_new_xml = ET.ElementTree(element_msg)
        xml_dict = xmltodict.parse(ET.tostring(tree_new_xml.getroot(), encoding='utf8'))
        json_obj = str.encode(json.dumps(xml_dict), 'utf8')

        yield json_obj

Upvotes: 0

Related Questions