ThinkNinja
ThinkNinja

Reputation: 79

Apache Beam explaination of ParDo behaviour

Taking an ndjson formatted text file the following code produces what I would expect. An ndjson file with the quotes.USD dict unnested and the original quotes element deleted.

  def unnest_quotes(element):
      element['USDquotes'] = element['quotes']['USD']
      del element['quotes']
      return element

  p = beam.Pipeline(options=pipeline_options)
  ReadJson = p | ReadFromText(known_args.input,coder=JsonCoder())
  MapFormattedJson = ReadJson | 'Map Function' >> beam.Map(unnest_quotes)
  MapFormattedJson | 'Write Map Output' >> WriteToText(known_args.output,coder=JsonCoder())

However when I try to achieve the same thing with a ParDo I don't understand the behaviour.

  class UnnestQuotes(beam.DoFn):
    def process(self,element):
      element['USDquotes'] = element['quotes']['USD']
      del element['quotes']
      return element

  p = beam.Pipeline(options=pipeline_options)
  ReadJson = p | ReadFromText(known_args.input,coder=JsonCoder())
  ClassFormattedJson = ReadJson | 'Pardo' >> beam.ParDo(UnnestQuotes())
  ClassFormattedJson | 'Write Class Output' >> WriteToText(known_args.output,coder=JsonCoder())

This produces a file with each key of the dict on a separate line with no value as shown below.

"last_updated"
"name"
"symbol"
"rank"
"total_supply"
"max_supply"
"circulating_supply"
"website_slug"
"id"
"USDquotes"

It's as if the PCollection produced by the Map function is the full dict whereas the Pardo produces a PCollection per key.

I know I can just use the map function but I need to understand this behaviour for when I do need to use a ParDo in the future.

Upvotes: 3

Views: 2760

Answers (1)

ThinkNinja
ThinkNinja

Reputation: 79

I figured this out with the help of this answer. apache beam flatmap vs map

As what I was experiencing was the same as the difference between FlatMap and Map. All I needed to do to get the desired behavior was to wrap the return from the Pardo in a list.

  class UnnestQuotes(beam.DoFn):
    def process(self,element):
      element['USDquotes'] = element['quotes']['USD']
      del element['quotes']
      return [element]

Upvotes: 4

Related Questions