Deepak Verma
Deepak Verma

Reputation: 673

Apache Beam python Bigquery change streaming insert into batch insert?

I am running an apache beam dataflow job, which reads from a bucket, performs some transformation and write to bigquery. But the records are inserted into the streaming buffer.

validated_data = (p1
                  | 'Read files from Storage '+url >> beam.io.ReadFromText(url)
                  | 'Validate records ' + url >> beam.Map(data_ingestion.validate, url)\
                  .with_outputs(SUCCESS_TAG_KEY, FAILED_TAG_KEY, main="main")
)
all_data, _, _ = validated_data
success_records = validated_data[SUCCESS_TAG_KEY]
failed_records = validated_data[FAILED_TAG_KEY]


(success_records
 | 'Extracting row from tagged row {}'.format(url) >> beam.Map(lambda row: row['row'])
 | 'Write to BigQuery table for {}'.format(url) >> beam.io.WriteToBigQuery(
            table=data_ingestion.get_table(tmp=TEST, run_date=data_ingestion.run_date),
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )
)

Actually, I need to delete the partition before running above as a way to avoid duplicated records for ingestion time partitioned table.

And Say If I run this job more than 1 time for the same file, without truncating the table, the table will end up having duplicate records.

And because last records are in streaming buffer, the delete partition table command does not actually remove the partition. Below is the code I am using to truncate the table. and this code runs before running the pipeline

client = bigquery.Client()
dataset = TABLE_MAP['dataset']
table = TABLE_MAP[sentiment_pipeline][table_type]['table']
table_id = "{}${}".format(table, format_date(run_date, '%Y%m%d'))
table_ref = client.dataset(dataset).table(table_id)
output = client.delete_table(table_ref)

Upvotes: 3

Views: 1396

Answers (1)

Tlaquetzal
Tlaquetzal

Reputation: 2850

According to BigQuery documentation, you may have to wait 30 minutes in order to make a DML statement on a a streaming table, and schema changes like delete/truncate tables might result in data loss for some scenarios. Here are some workarounds you could try for dealing with duplicates in a streaming scenario.

Additionally, Apache Beam and Dataflow now supports batch insert for python, so it might be a good way to avoid streaming limitations.

Upvotes: 0

Related Questions