Reputation: 79
Taking an ndjson formatted text file the following code produces what I would expect. An ndjson file with the quotes.USD dict unnested and the original quotes element deleted.
def unnest_quotes(element):
element['USDquotes'] = element['quotes']['USD']
del element['quotes']
return element
p = beam.Pipeline(options=pipeline_options)
ReadJson = p | ReadFromText(known_args.input,coder=JsonCoder())
MapFormattedJson = ReadJson | 'Map Function' >> beam.Map(unnest_quotes)
MapFormattedJson | 'Write Map Output' >> WriteToText(known_args.output,coder=JsonCoder())
However when I try to achieve the same thing with a ParDo I don't understand the behaviour.
class UnnestQuotes(beam.DoFn):
def process(self,element):
element['USDquotes'] = element['quotes']['USD']
del element['quotes']
return element
p = beam.Pipeline(options=pipeline_options)
ReadJson = p | ReadFromText(known_args.input,coder=JsonCoder())
ClassFormattedJson = ReadJson | 'Pardo' >> beam.ParDo(UnnestQuotes())
ClassFormattedJson | 'Write Class Output' >> WriteToText(known_args.output,coder=JsonCoder())
This produces a file with each key of the dict on a separate line with no value as shown below.
"last_updated"
"name"
"symbol"
"rank"
"total_supply"
"max_supply"
"circulating_supply"
"website_slug"
"id"
"USDquotes"
It's as if the PCollection produced by the Map function is the full dict whereas the Pardo produces a PCollection per key.
I know I can just use the map function but I need to understand this behaviour for when I do need to use a ParDo in the future.
Upvotes: 3
Views: 2760
Reputation: 79
I figured this out with the help of this answer. apache beam flatmap vs map
As what I was experiencing was the same as the difference between FlatMap and Map. All I needed to do to get the desired behavior was to wrap the return from the Pardo in a list.
class UnnestQuotes(beam.DoFn):
def process(self,element):
element['USDquotes'] = element['quotes']['USD']
del element['quotes']
return [element]
Upvotes: 4