Sharvil Popli
Sharvil Popli

Reputation: 158

Apache Beam - Pass a variable from previous step in Python

I am trying to build a simple pipeline processing a json. My Pipeline looks like this

input_files = 'message.json'

options = PipelineOptions()
with beam.Pipeline(options=options) as p:
    # reading json and fetching its contents
    lines = (
            p | 'Read JSON' >> beam.io.ReadFromText(input_files)
              | 'Fetch Data and File Name' >> beam.ParDo(FetchFileName('start'))
        )
    sdata = beam.pvalue.AsSingleton(lines['sdata'])
    transform = (
            lines
            | 'Parallel Transform of Data' >> beam.ParDo(AdjustObject(), service_id=sdata)
            | 'PRINT' >> beam.Map(print)
        )

And the FetchFileName looks like this

class FetchFileName(beam.DoFn):
    def process(self,element):
        element = json.loads(element)
        gdata = element['data']
        for elm in gdata:
            yield elm
        yield beam.pvalue.TaggedOutput('sdata', element['id'])

I am trying to pass the 'id = 999' value as a variable called 'sdata' to the next pipeline step called 'Parallel Transform of Data' but keep getting the following error

TypeError: 'PCollection' object is not subscriptable

My Json file looks like below

{"id":999,"data":[{"Emp_Id":495},{"Emp_Id":494}]}

Upvotes: 0

Views: 1822

Answers (1)

Kenn Knowles
Kenn Knowles

Reputation: 6023

According to the code you have shared, the step ParDo(FetchFileName('start')) should produce two output PCollections:

  1. The main output, containing each item in element['gdata'] across all the JSON values in all the files.
  2. The output sdata that contains the collection of element['id'] across all the JSON values in all the files.

To produce these two PCollections you need to modify your ParDo with the .with_outputs method, like this:

results = (
    p | 'Read JSON' >> beam.io.ReadFromText(input_files)
      | 'Fetch Data and File Name' >> beam.ParDo(FetchFileName('start').with_outputs(
          'sdata',
          main='gdata'))

So if the input files contained

{
  "id": 1,
  "gdata": ["a", "b", "c"]
}
{
  "id": 2,
  "gdata": ["q"]
}
{
  "id": 3,
  "gdata": ["y", "z"]
}

Then the output PCollections would be:

  • results['sdata'] would contain 1, 2, 3
  • results['gdata'] would contain "a", "b", "c", "q", "x", "y"

See the full documentation at https://beam.apache.org/documentation/programming-guide/#additional-outputs.

Now I see another problem: the next line would probably not work as expected. Since the collection results['sdata'] contains many values, you cannot apply AsSingleton. I do not know your full desired result, but I hope this helps to explain the first part to get you further toward your goal.

Upvotes: 1

Related Questions