Ajith Kannan
Ajith Kannan

Reputation: 842

Airflow XCom Push from SparkSubmitOperator not working

I have an airflow job which launches a Spark job in one task and the next task extracts the application logs to find the Spark job application ID. I use Xcom push in the spark-submit task and xcom_pull in the next task.

def extract_app_id(**kwargs):
    ti = kwargs['ti']
    log = ti.xcom_pull(task_ids='submit_spark_job')
    log_str = str(log)
    logger.info("Xcom pulled str log %s",log_str)
    app_id = re.search(r'application_\d+_\d+', str(log))


spark_submit_task = SparkSubmitOperator(
        name=f"{job_name}",
        task_id="submit_spark_job",
        conn_id="spark3",
        conf=conf,
        java_class="Application",
        application=f"{jar_path}{jar_file}",
        do_xcom_push=True,
        application_args=application_args,
        execution_timeout=timedelta(minutes=5) 
    )

extract_app_id_task = PythonOperator(
        task_id='extract_app_id',
        python_callable=extract_app_id,
        provide_context=True,
        trigger_rule=TriggerRule.ALL_DONE
   )

spark_submit_task >> extract_app_id_task

The issue is that the spark_submit_task successfully launches the spark job and the logs are printing the spark application ID like the below is from the actual log:

4/06/14 22:37:37 INFO Client: Application report for application_1718047116285_4363 (state: RUNNING)
Identified spark driver id: application_1718047116285_4363

But the xcom_pull always returns null even though the spark job is successfully launched.

I use Python 3 and Airflow 2.0.0.

Upvotes: 0

Views: 244

Answers (1)

Illia Kaltovich
Illia Kaltovich

Reputation: 130

I believe the idea is that the SparkSubmitOperator does not return any XCOM by default. Based on my experience, there is always a simple check to verify what return value is specified at the end of the execute method of the operator in the source code. So, I looked into it, and there is no return statement, only a hook that will spin up a job with the required configuration: https://github.com/apache/airflow/blob/providers-apache-spark/4.7.1/airflow/providers/apache/spark/operators/spark_submit.py#L30

For example, to compare, the AthenaQueryOperator will return a QueryExecutionID when the query processing is finished. See: https://github.com/apache/airflow/blob/providers-amazon/8.21.0/airflow/providers/amazon/aws/operators/athena.py#L181

With that in mind, I assume you might try to customize the existing operator if needed or decide on an alternative approach

Upvotes: 1

Related Questions