Reputation: 197
I am using apache beam to achieve 3 steps in one pipeline.
I noticed that step 3 executed before I updated the schema in step 2, and I got errors saying "Error while reading data, error message: JSON table encountered too many errors, giving up". However, if I run the same code again, the data could be successfully saved. It looks like that the first time step3 executed before the complete schema has been updated in step2.
I am new to Apache beam. Could you please help with it? Thanks. My code is attached below.
dim_seller_etl_executor = (
p1
| "read" >> beam.io.ReadFromMongoDB(uri='mongodb:///',
db='',
coll='',
bucket_auto=True,
extra_client_params={"username": "",
"password": ""})
| "transform" >> beam.Map(transform_doc)
| 'save' >> beam.io.Write((beam.io.WriteToBigQuery("table_id",
schema=table_schema_for_beam,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
)
def transform_doc(document):
global table_schema_for_beam
global column_name_type
new_columns = []
for name, value in document.items():
if name not in column_name_type:
# some ways to get the column type
new_columns.append((name, column_type))
else:
column_type = column_name_type[name]
data[name] = document[name] if document[name] is not None else None
# if new columns appear, update the schema in bigquery and the schema used in beam.io.WriteToBigQuery
if new_columns:
bigquery_schema.add_columns(new_columns)
table_schema_for_beam, column_name_type = bigquery_schema.get_table_schema_for_beam()
return data
Upvotes: 1
Views: 289
Reputation: 6023
I can see two issues:
table_schema_for_beam
variable is evaluated during pipeline construction. Mutating this variables while the pipeline is running may have unknown effects, or no effect at all.Upvotes: 1