Programmer120
Programmer120

Reputation: 2592

How to dynamically create operators with different params

I have the following code:

def chunck_import(**kwargs):
    ...
    logging.info('Number of pages required is: {0}'.format(num_pages))
    for i in range(1, num_pages + 1):
        ...
        parameter_where = 'where orders_id between {0} and {1}'.format(start,end)
        logging.info(parameter_where)

chunck_import_op = PythonOperator(
    task_id='chunck_import',
    provide_context=True,
    python_callable=chunck_import,
    dag=dag)


start_task_op >> ... >>  chunck_import_op

This operator create multiple WHERE statements:

INFO - From 7557920 to 7793493
INFO - Number of pages required is: 4
where orders_id between 7607920 and 7657920
where orders_id between 7657921 and 7707920
where orders_id between 7707921 and 7757920
where orders_id between 7757921 and 7793493

Now, I have a MySqlToGoogleCloudStorageOperator as follows:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    ...
    sql = 'select * from orders {{ params.where_cluster }}',
    params={'where_cluster': parameter_where},
    dag=dag) 

The chunck_import_op knows the number of times I need to call the MySqlToGoogleCloudStorageOperator - num_pages It also create the string that I need to pass as parameter- parameter_where

My issue is how to dynamically create the MySqlToGoogleCloudStorageOperator according to num_pages and pass the parameter_where to it.

Upvotes: 1

Views: 1099

Answers (2)

dlamblin
dlamblin

Reputation: 45381

I would subclass MySqlToGoogleCloudStorageOperator to customize the query and override the execute steps to produce a paged query as per a page size parameter passed to the operator. This is some extra work, but is recommended over the other options here.

You cannot however have a PythonOperator, or any operator, modify the DAG (and have it recognized and scheduled). The most it could do is one of:

  1. After building the where clause, construct a MySqlToGoogleCloudStorageOperator with the clause and call execute on it right within the PythonOperator. This will work, and you will see log messages from the MySqlToGoogleCloudStorageOperator right in the PythonOperator's logs.
  2. Use the PythonOperator or TriggerDagRunOperator to trigger another DAG with just the MySqlToGoogleCloudStorageOperator passing in the clause as a parameter, or pushing it to XCOM for that DAG first. That other DAG should probably have a Schedule set to @None. This will make following up the logs a little harder, but it could run the DAGs in parallel.

If it were my DAG, I think my approach instead (if not subclassing) would be to always process in 1 to X pages. Let's propose your DAG should handle a maximum X pages of results where X is 10 for example. Then define 10 branches off chunck_import_op's parent. You won't need chunck_import_op or the callable.

  • Each branch will start with a ShortCircuitOperator which calls the same callable with different offset arguments (0 through 9). This callable will check if offset * page_size is greater than end, if so it returns False, skipping its downstream operators. Otherwise it'll push into xcom a valid query with the range based on the offset and return True to run them.
  • Each branch continues with a MySqlToGoogleCloudStorageOperator which has the query set as {{ ti.xcom_pull('<ShortCircuitOperator_N>') }} where the string is the name of the preceding ShortCircuitOperator.
  • If you need other operators after the MySqlToGoogleCloudStorageOperators first add a DummyOperator as a child of all of these MySqlToGoogleCloudStorageOperators, and make the trigger_rule ALL_DONE, then add the other operators as children of that one.

This way you can run 1 to 10 paged queries if necessary. They might run in parallel though, I don't think that's a potential problem though, just consider it.

Upvotes: 1

SergiyKolesnikov
SergiyKolesnikov

Reputation: 7815

Airflow provides the XComs mechanism for the tasks (operators) to communicate among themselves. In your concrete scenario, the chunck_import task can precompute all where clauses first and push them into an XCom; then the import_orders task can pull the XCom, read all where clauses, and use them as needed.

If this mechanism does not work for your application logic then please amend your question and explain why not.

Upvotes: 0

Related Questions