Reputation: 2540
Is there any way to move data from one table to another in BQ?
My case:
I have some data continuously being dropped into a non-partitioned table, created by a process I don't control. The table gets very large, and becomes hard to query efficiently, so I want to partition it. To do that, I can use airflow to repeatedly SELECT INTO ...
and copy to an identical table that is partitioned. It makes sense to then delete the data I've copied from the source table, since it is not needed, and makes future SELECT INTOs more costly.
So far I've got a separate SELECT INTO ...
followed by a DELETE FROM ...
that gets the job done but I wonder if there is something that would work more atomically and just move the data.
Another solution might be adding partitioning to a pre-exiting table, but I haven't seen anything to suggest that is possible.
I looked into BQ's MERGE INTO, but it doesn't seem like I can delete from the source table with that.
Thanks
Upvotes: 0
Views: 1340
Reputation: 3883
You can think about using scheduled queries, which allows schedule your query to run in a interval according to your necessities. This also would work great with partitioned tables as it allows you to prune the query and improving its performance and reducing costs.
Another possible solution that comes to my mind is running jobs programmatically i.e. by using Python code. It is not possible to rerun a job using the same job ID. Instead you create a new job with the same configuration.
And finally, it is perfect scenario for Cloud Composer with using the bigquery_operator. You can set schedule for running your pipeline (i.e. once a day):
from airflow import macros
from airflow import models
import datetime
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_args = {
"start_date": yesterday,
"retries": 1,
"email_on_failure": False,
"email_on_retry": False,
"email": "[email protected]"
}
with models.DAG(
'copy_bq_data',
schedule_interval=datetime.timedelta(days=1),
default_args=default_args) as dag:
BQ_PROJECT = "my-bq-project"
BQ_DATASET = "my-bq-dataset"
# Task 1: create a partition table
creating_partition_table = BigQueryOperator(
task_id='bq_write',
sql='''
#standardSQL
SELECT
...
''',
destination_dataset_table='{0}.{1}.daily_metrics'.format(
BQ_PROJECT, BQ_DATASET)
),
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
use_legacy_sql=False,
dag=dag
task_id="display"
)
# Task 2: remove data from source table
remove_data_from_table = BigQueryOperator(
task_id='bq_remove',
sql='''
#standardSQL
DELETE
FROM
`source_table`
WHERE TRUE
'''
),
use_legacy_sql=False,
dag=dag
)
creating_partition_table >> remove_data_from_table
Additionally, please take a look to the following Stackoverflow thread. I hope you find the above pieces of information useful.
Upvotes: 1