Reputation: 2592
I have the following code:
def chunck_import(**kwargs):
...
logging.info('Number of pages required is: {0}'.format(num_pages))
for i in range(1, num_pages + 1):
...
parameter_where = 'where orders_id between {0} and {1}'.format(start,end)
logging.info(parameter_where)
chunck_import_op = PythonOperator(
task_id='chunck_import',
provide_context=True,
python_callable=chunck_import,
dag=dag)
start_task_op >> ... >> chunck_import_op
This operator create multiple WHERE
statements:
INFO - From 7557920 to 7793493
INFO - Number of pages required is: 4
where orders_id between 7607920 and 7657920
where orders_id between 7657921 and 7707920
where orders_id between 7707921 and 7757920
where orders_id between 7757921 and 7793493
Now, I have a MySqlToGoogleCloudStorageOperator
as follows:
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
...
sql = 'select * from orders {{ params.where_cluster }}',
params={'where_cluster': parameter_where},
dag=dag)
The chunck_import_op
knows the number of times I need to call the MySqlToGoogleCloudStorageOperator
- num_pages
It also create the string that I need to pass as parameter- parameter_where
My issue is how to dynamically create the MySqlToGoogleCloudStorageOperator
according to num_pages
and pass the parameter_where
to it.
Upvotes: 1
Views: 1099
Reputation: 45381
I would subclass MySqlToGoogleCloudStorageOperator
to customize the query and override the execute steps to produce a paged query as per a page size parameter passed to the operator. This is some extra work, but is recommended over the other options here.
You cannot however have a PythonOperator
, or any operator, modify the DAG (and have it recognized and scheduled). The most it could do is one of:
MySqlToGoogleCloudStorageOperator
with the clause and call execute on it right within the PythonOperator
. This will work, and you will see log messages from the MySqlToGoogleCloudStorageOperator
right in the PythonOperator
's logs.PythonOperator
or TriggerDagRunOperator
to trigger another DAG with just the MySqlToGoogleCloudStorageOperator
passing in the clause as a parameter, or pushing it to XCOM for that DAG first. That other DAG should probably have a Schedule set to @None
. This will make following up the logs a little harder, but it could run the DAGs in parallel.If it were my DAG, I think my approach instead (if not subclassing) would be to always process in 1 to X pages. Let's propose your DAG should handle a maximum X pages of results where X is 10 for example. Then define 10 branches off chunck_import_op
's parent. You won't need chunck_import_op
or the callable.
ShortCircuitOperator
which calls the same callable with different offset
arguments (0 through 9). This callable will check if offset * page_size
is greater than end
, if so it returns False
, skipping its downstream operators. Otherwise it'll push into xcom a valid query with the range based on the offset and return True
to run them.MySqlToGoogleCloudStorageOperator
which has the query set as {{ ti.xcom_pull('<ShortCircuitOperator_N>') }}
where the string is the name of the preceding ShortCircuitOperator
.MySqlToGoogleCloudStorageOperator
s first add a DummyOperator
as a child of all of these MySqlToGoogleCloudStorageOperator
s, and make the trigger_rule
ALL_DONE
, then add the other operators as children of that one.This way you can run 1 to 10 paged queries if necessary. They might run in parallel though, I don't think that's a potential problem though, just consider it.
Upvotes: 1
Reputation: 7815
Airflow provides the XComs mechanism for the tasks (operators) to communicate among themselves. In your concrete scenario, the chunck_import
task can precompute all where clauses first and push them into an XCom; then the import_orders
task can pull the XCom, read all where clauses, and use them as needed.
If this mechanism does not work for your application logic then please amend your question and explain why not.
Upvotes: 0