Reputation: 25
I have a variable which value I 'd like to be pushed to Airflow so I can use it as an input for the next task. I know that I must use xcoms but I haven't figured out how to push from the spark task to the Airflow
def c_count():
return spark_task(
name='c_count',
script='c_count.py',
dag=dag,
table=None,
host=Variable.get('host'),
trigger_rule="all_done",
provide_context=True,
xcom_push = True
)
def c_int():
return spark_task(
name='c_in',
script='another_test.py',
dag=dag,
table=None,
host=Variable.get('host'),
trigger_rule="all_done",
counts="{{ task_instance.xcom_pull(task_ids='c_count') }}"
)
EDIT: The spark task is the following:
def spark_task_sapbw(name, script, dag, table, host, **kwargs):
spark_cmd = 'spark-submit'
if Variable.get('spark_master_uri', None):
spark_cmd += ' --master {}'.format(Variable.get('spark_master_uri'))
.
.
.
task = BashOperator(
task_id=name,
bash_command=spark_cmd,
dag=dag,
**kwargs
)
return task
The problem is that what I get back is the last print of the Airflow's log. Is there any way that I can get a specific value from the spark script? Thank you!
Upvotes: 1
Views: 1945