Hippocrates
Hippocrates

Reputation: 2540

Possible to atomically move data from one table to another in BigQuery?

Is there any way to move data from one table to another in BQ?

My case: I have some data continuously being dropped into a non-partitioned table, created by a process I don't control. The table gets very large, and becomes hard to query efficiently, so I want to partition it. To do that, I can use airflow to repeatedly SELECT INTO ... and copy to an identical table that is partitioned. It makes sense to then delete the data I've copied from the source table, since it is not needed, and makes future SELECT INTOs more costly.

So far I've got a separate SELECT INTO ... followed by a DELETE FROM ... that gets the job done but I wonder if there is something that would work more atomically and just move the data.

Another solution might be adding partitioning to a pre-exiting table, but I haven't seen anything to suggest that is possible.

I looked into BQ's MERGE INTO, but it doesn't seem like I can delete from the source table with that.

Thanks

Upvotes: 0

Views: 1340

Answers (1)

aga
aga

Reputation: 3883

You can think about using scheduled queries, which allows schedule your query to run in a interval according to your necessities. This also would work great with partitioned tables as it allows you to prune the query and improving its performance and reducing costs.

Another possible solution that comes to my mind is running jobs programmatically i.e. by using Python code. It is not possible to rerun a job using the same job ID. Instead you create a new job with the same configuration.

And finally, it is perfect scenario for Cloud Composer with using the bigquery_operator. You can set schedule for running your pipeline (i.e. once a day):

from airflow import macros
from airflow import models
import datetime
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_args = {
            "start_date": yesterday,
            "retries": 1,
            "email_on_failure": False,
            "email_on_retry": False,
            "email": "[email protected]"
}

with models.DAG(
    'copy_bq_data', 
    schedule_interval=datetime.timedelta(days=1), 
    default_args=default_args) as dag:

    BQ_PROJECT = "my-bq-project"
    BQ_DATASET = "my-bq-dataset"

    # Task 1: create a partition table
    creating_partition_table = BigQueryOperator(
        task_id='bq_write',    
        sql='''
        #standardSQL
        SELECT
          ...
        ''',
        destination_dataset_table='{0}.{1}.daily_metrics'.format(
            BQ_PROJECT, BQ_DATASET)
        ),    
        write_disposition='WRITE_TRUNCATE',
        allow_large_results=True,
        use_legacy_sql=False,
        dag=dag
        task_id="display"
    )


    # Task 2: remove data from source table
    remove_data_from_table = BigQueryOperator(
        task_id='bq_remove',    
        sql='''
        #standardSQL
        DELETE 
        FROM 
          `source_table` 
        WHERE TRUE
        '''
        ),
        use_legacy_sql=False,
        dag=dag
    )

    creating_partition_table >> remove_data_from_table

Additionally, please take a look to the following Stackoverflow thread. I hope you find the above pieces of information useful.

Upvotes: 1

Related Questions