Reputation: 1095
I am new to airflow and I am trying to schedule a pyspark job in airflow deployed in docker containers, here is my dag,
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import datetime, timedelta
spark_master = "spark://spark:7077"
spark_app_name = "Spark Hello World"
now = datetime.now()
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(now.year, now.month, now.day),
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1)
}
dag = DAG(
dag_id="spark-test",
description="This DAG runs a simple Pyspark app.",
default_args=default_args,
schedule_interval=timedelta(1)
)
t1 = DummyOperator(task_id="start", dag=dag)
#Task 2 check if file exist
t2 = BashOperator(task_id='check_file_exists', bash_command='shasum
/usr/local/spark/app/first.py',retries=2, retry_delay=timedelta(seconds=15),dag=dag)
t3 = SparkSubmitOperator(task_id="spark_job", application='/usr/local/spark/app/first.py',
name=spark_app_name,
conn_id="spark_default",
conf={"spark.master":spark_master},
dag=dag)
t1 >> t2 >> t3
My python script is: first.py
from pyspark import SparkContext, SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("app")
sc = SparkContext(conf=conf)
text_file = sc.textFile("/usr/local/spark/resources/data/Loren.txt")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("/usr/local/spark/resources/data/loren_counts_task4")
The error I'm getting FileNotFoundError: [Errno 2] No such file or directory: 'spark-submit': 'spark-submit'
Reading local file: /usr/local/airflow/logs/spark-test/spark_job/2021-07-
09T20:46:19.130980+00:00/2.log
[2021-07-09 20:47:50,119] {{taskinstance.py:655}} INFO - Dependencies all met for
<TaskInstance: spark-test.spark_job 2021-07-09T20:46:19.130980+00:00 [queued]>
[2021-07-09 20:47:50,151] {{taskinstance.py:655}} INFO - Dependencies all met for
<TaskInstance: spark-test.spark_job 2021-07-09T20:46:19.130980+00:00 [queued]>
[2021-07-09 20:47:50,152] {{taskinstance.py:866}} INFO -
--------------------------------------------------------------------------------
[2021-07-09 20:47:50,152] {{taskinstance.py:867}} INFO - Starting attempt 2 of 2
[2021-07-09 20:47:50,152] {{taskinstance.py:868}} INFO -
--------------------------------------------------------------------------------
[2021-07-09 20:47:50,165] {{taskinstance.py:887}} INFO - Executing <Task(SparkSubmitOperator):
spark_job> on 2021-07-09T20:46:19.130980+00:00
[2021-07-09 20:47:50,169] {{standard_task_runner.py:53}} INFO - Started process 19335 to run
task
[2021-07-09 20:47:50,249] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance:
spark-test.spark_job 2021-07-09T20:46:19.130980+00:00 [running]> 9b6d4f74ee93
[2021-07-09 20:47:50,293] {{logging_mixin.py:112}} INFO - [2021-07-09 20:47:50,292]
{{base_hook.py:84}} INFO - Using connection to: id: spark_default. Host: yarn, Port: None,
Schema: None, Login: None, Password: None, extra: XXXXXXXX
[2021-07-09 20:47:50,294] {{logging_mixin.py:112}} INFO - [2021-07-09 20:47:50,294]
{{spark_submit_hook.py:323}} INFO - Spark-Submit cmd: spark-submit --master yarn --conf
spark.master=spark://spark:7077 --name Spark Hello World --queue root.default
usr/local/spark/app/first.py
[2021-07-09 20:47:50,301] {{taskinstance.py:1128}} ERROR - [Errno 2] No such file or
directory: 'spark-submit': 'spark-submit'
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in
_run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-
packages/airflow/contrib/operators/spark_submit_operator.py", line 187, in execute
self._hook.submit(self._application)
File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/spark_submit_hook.py", line
393, in submit
**kwargs)
File "/usr/local/lib/python3.7/subprocess.py", line 800, in __init__
restore_signals, start_new_session)
File "/usr/local/lib/python3.7/subprocess.py", line 1551, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'spark-submit': 'spark-submit'
[2021-07-09 20:47:50,304] {{taskinstance.py:1170}} INFO - All retries failed; marking task as
FAILED.dag_id=spark-test, task_id=spark_job, execution_date=20210709T204619,
start_date=20210709T204750, end_date=20210709T204750
[2021-07-09 20:48:00,096] {{logging_mixin.py:112}} INFO - [2021-07-09 20:48:00,095]
{{local_task_job.py:103}} INFO - Task exited with return code 1
I ran the spark-submit on the spark container and it works perfectly. i am not sure what is wrong
Upvotes: 1
Views: 2954
Reputation: 11
You should see this link Apache Spark and Apache Airflow connection in Docker based solution
From error
spark-submit --master yarn --conf
spark.master=spark://spark:7077 --name Spark Hello World --queue root.default
It has to be
spark-submit --master spark://spark:7077 --conf
spark.master=spark://spark:7077 --name Spark Hello World --queue root.default
By setting master in extra options in your connections for this spark conn id (spark_default).
Conn Type: Spark (If there is no spark. You should install apache-airflow-providers-apache-spark in airflow docker.)
Host: spark://spark
port: 7077
I am not sure this is your docker-compose file or not.
https://github.com/puckel/docker-airflow/blob/master/docker-compose-LocalExecutor.yml
If you want to install the package in a container. You should edit the second line
webserver:
image: puckel/docker-airflow:1.10.9
restart: always
to
webserver:
build: ./airflow
restart: always
This is an airflow directory.
Dockerfile
FROM puckel/docker-airflow:1.10.9
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
RUN rm -rf requirements.txt
requirements.txt
apache-airflow-providers-apache-spark == X.X.X (The version which compatible with your airflow version )
You can find it here (The version which compatible with your airflow version ). https://pypi.org/project/apache-airflow-providers-apache-spark/
Maybe you should run the command submit-spark to see what is going on and fix the error there(in the container).I hope you can fix it.
Upvotes: 1