Michael Germ
Michael Germ

Reputation: 25

Push variable from Spark to Airflow

I have a variable which value I 'd like to be pushed to Airflow so I can use it as an input for the next task. I know that I must use xcoms but I haven't figured out how to push from the spark task to the Airflow

def c_count():
    return spark_task(
        name='c_count',
        script='c_count.py',
        dag=dag,
        table=None,
        host=Variable.get('host'),
        trigger_rule="all_done",
        provide_context=True,
        xcom_push = True
    )





 def c_int():
        return spark_task(
            name='c_in',
            script='another_test.py',
            dag=dag,
            table=None,
            host=Variable.get('host'),
            trigger_rule="all_done",
            counts="{{ task_instance.xcom_pull(task_ids='c_count') }}"
        )

EDIT: The spark task is the following:

def spark_task_sapbw(name, script, dag, table, host, **kwargs):

    spark_cmd = 'spark-submit'

    if Variable.get('spark_master_uri', None):
        spark_cmd += ' --master {}'.format(Variable.get('spark_master_uri'))
.
.
.


    task = BashOperator(
        task_id=name,
        bash_command=spark_cmd,
        dag=dag,
        **kwargs
    )
    return task

The problem is that what I get back is the last print of the Airflow's log. Is there any way that I can get a specific value from the spark script? Thank you!

Upvotes: 1

Views: 1945

Answers (1)

Steven
Steven

Reputation: 15258

You cannot make directly spark and airflow communicate. You have to use Python in between. collect the values you need and push them to airflow with XComs.

Upvotes: 2

Related Questions