Reputation: 90
I'm facing a situation with beam.GroupByKey(), I've loaded a file whose amount of lines is 42.854.
Due to the business rules, I need to execute a GroupByKey(); However, After finishing its execution I noticed that I got almost the double of lines. As you can see below :
Step before the GroupByKey():
Why am I having this behavior ?
I'm not doing anything special in my pipeline:
with beam.Pipeline(runner, options=opts) as p:
#LOAD FILE
elements = p | 'Load File' >> beam.Create(fileRNS.values)
#PREPARE VALUES (BULK INSERT)
Script_Values = elements | 'Prepare Bulk Insert' >> beam.ParDo(Prepare_Bulk_Insert())
Grouped_Values = Script_Values | 'Grouping values' >> beam.GroupByKey()
#BULK INSERT INTO POSTGRESQL
Grouped_Values | 'Insert PostgreSQL' >> beam.ParDo(ExecuteInsert)
2021-02-09
When I debug, Prepare_Bulk_Insert() has the following content :
As you can see, the amount of elements is correct, I don't understand why GroupByKey() has its input with a higher amount of elements if I'm sending the correct amount.
The Grouped_Values | 'Insert PostgreSQL' >> beam.ParDo(funcaoMap) has its input as follows:
Double amount. =(
Kind regards, Juliano Medeiros
Upvotes: 0
Views: 309
Reputation: 3010
Those screenshots indicate that the "Prepare Bulk Insert" DoFn is outputting more that one element per input element. Your first screenshot is showing the input PCollection of the GBK (which is produced by the DoFn) and the second is the input to the DoFn, so the difference must be produced by that DoFn.
Upvotes: 1