Reputation: 842
I have an airflow job which launches a Spark job in one task and the next task extracts the application logs to find the Spark job application ID. I use Xcom push in the spark-submit task and xcom_pull in the next task.
def extract_app_id(**kwargs):
ti = kwargs['ti']
log = ti.xcom_pull(task_ids='submit_spark_job')
log_str = str(log)
logger.info("Xcom pulled str log %s",log_str)
app_id = re.search(r'application_\d+_\d+', str(log))
spark_submit_task = SparkSubmitOperator(
name=f"{job_name}",
task_id="submit_spark_job",
conn_id="spark3",
conf=conf,
java_class="Application",
application=f"{jar_path}{jar_file}",
do_xcom_push=True,
application_args=application_args,
execution_timeout=timedelta(minutes=5)
)
extract_app_id_task = PythonOperator(
task_id='extract_app_id',
python_callable=extract_app_id,
provide_context=True,
trigger_rule=TriggerRule.ALL_DONE
)
spark_submit_task >> extract_app_id_task
The issue is that the spark_submit_task
successfully launches the spark job and the logs are printing the spark application ID like the below is from the actual log:
4/06/14 22:37:37 INFO Client: Application report for application_1718047116285_4363 (state: RUNNING)
Identified spark driver id: application_1718047116285_4363
But the xcom_pull
always returns null
even though the spark job is successfully launched.
I use Python 3
and Airflow 2.0.0
.
Upvotes: 0
Views: 244
Reputation: 130
I believe the idea is that the SparkSubmitOperator does not return any XCOM by default. Based on my experience, there is always a simple check to verify what return value is specified at the end of the execute method of the operator in the source code. So, I looked into it, and there is no return statement, only a hook that will spin up a job with the required configuration: https://github.com/apache/airflow/blob/providers-apache-spark/4.7.1/airflow/providers/apache/spark/operators/spark_submit.py#L30
For example, to compare, the AthenaQueryOperator will return a QueryExecutionID when the query processing is finished. See: https://github.com/apache/airflow/blob/providers-amazon/8.21.0/airflow/providers/amazon/aws/operators/athena.py#L181
With that in mind, I assume you might try to customize the existing operator if needed or decide on an alternative approach
Upvotes: 1