Roghen
Roghen

Reputation: 31

Airflow to interact with BigQuery outside of the DAG not using BigQueryOperators

I'm struggling to find a solution for the following problem on Airflow:

I have on BQ a table with a list of products (that is incremented on a regular basis). And each product has a differect project on BigQuery/GoogleCloud. Let's say:

PRODUCT | ID | PROJECT_ID | PARAM_1 | PARAM_2

My current pipe on Jenkins uses a for loop to build parallel DAGs for each product and works very well.

As I was translating to a Airflow DAG I was able to achieve the following:

...
product_params = {
    'Product1': {
        'project_id': 'product-1',
        'color': 'Blue'
    },
    'Product2': {
        'project_id': 'product-2',
        'color': 'Red'
    },...
}


my_dag = DAG(
    'My_Default_DAG',
    schedule_interval='@once',
    default_args=default_args
    )

dag_tasks = {}

with firebase_dag as dag:
    for product_name, p_params in product_params.items():
        query_params = {
            'product_name': product_name,
            'product_project': product_params['project_id'],
            'color': product_params['color'],
            'event_date': '2019-12-01',
            'event_date_suffix': '20191201'
        }

        dag_tasks[game] = {}

        dag_tasks[game]['step_1'] = BigQueryOperator(
                task_id="{0}_step_1".format(product_name),
                bql='sql_folder/step-1.sql',
                use_legacy_sql=False,
                destination_dataset_table="{0}.prod_dataset.step1Table_{1}".format(product_params['project_id'], query_params['event_date_suffix']),
                write_disposition='WRITE_TRUNCATE',
                params=query_params
            )
       ### following steps...

Ideally I would like to query my product params directly on BigQuery. And I already have a developed python lib for that on a bitbucket, with a bunch of other methods that is extensively used by Jenkins.

Is there any way I could import that lib to airflow and use it in my dags?

Otherwise, is there any other way I could build methods that interacts with bigquery other than by BigQueryOperators?

Upvotes: 0

Views: 160

Answers (2)

Javier Bóbeda
Javier Bóbeda

Reputation: 468

You can install your own Python libraries in Composer from a private repository or from a local file.

It is advisable that you test your PyPI packages locally in an Airflow worker container before deploying to production, as custom packages might cause conflicts with dependencies that Airflow requires.

Upvotes: 0

kaxil
kaxil

Reputation: 18834

Yes you can use your library in DAGs and use it with PythonOperator.

Upvotes: 1

Related Questions