ds_user
ds_user

Reputation: 2179

airflow spark-submit operator - No such file or directory

I am trying to schedule a spark job in airflow, here is my dag,

from __future__ import print_function
import airflow
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import os

APPLICATION_FILE_PATH = "/home/ubuntu/airflow/dags/scripts/"


default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'retries': 0,
    }


dag = DAG('datatodb', default_args=default_args, start_date=(datetime(2018,7,28)),schedule_interval='0 5 * * *')


data_to_db = SparkSubmitOperator(
    task_id='data_to_db',
    application=APPLICATION_FILE_PATH+"ds_load.py",
    dag=dag,
    run_as_user='ubuntu',
    application_args=["{{ ds }}"]
)

data_to_db

And my python script is this,

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from datetime import datetime, timedelta
import sys


def write_to_db(previous_day,spark_session):

  drop_cols = ['created_date','Year','Month']
  datapath = "s3a://***/"
  s3path = datapath + 'date=' + str(previous_day)
  data_to_load_df = spark_session.read.parquet(s3path).drop(*drop_cols).withColumn('date',lit(previous_day))

  data_to_load_df.write.format('jdbc').options(url='jdbc:mysql://servername:3306/dbname',
       driver='com.mysql.jdbc.Driver',
       dbtable='report_table',
       user='****',
       password="***").mode('append').save()

def main(previous_day,spark_session=None):
  if spark_session is None:
        spark_session = SparkSession.builder.appName("s3_to_db").getOrCreate()

  write_to_db(previous_day,spark_session)

if __name__ == "__main__":
    previous_day = sys.argv[1]
    main(previous_day)

Not sure what is wrong with this, I keep getting this error,

[2018-08-01 02:08:37,278] {base_task_runner.py:98} INFO - Subtask: [2018-08-01 02:08:37,278] {base_hook.py:80} INFO - Using connection to: local
[2018-08-01 02:08:37,298] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-08-01 02:08:37,299] {base_task_runner.py:98} INFO - Subtask:   File "/home/ubuntu/anaconda2/bin/airflow", line 27, in <module>
[2018-08-01 02:08:37,299] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-08-01 02:08:37,299] {base_task_runner.py:98} INFO - Subtask:   File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-08-01 02:08:37,299] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-08-01 02:08:37,299] {base_task_runner.py:98} INFO - Subtask:   File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-08-01 02:08:37,299] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-08-01 02:08:37,300] {base_task_runner.py:98} INFO - Subtask:   File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-08-01 02:08:37,300] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-08-01 02:08:37,300] {base_task_runner.py:98} INFO - Subtask:   File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/airflow/contrib/operators/spark_submit_operator.py", line 145, in execute
[2018-08-01 02:08:37,300] {base_task_runner.py:98} INFO - Subtask:     self._hook.submit(self._application)
[2018-08-01 02:08:37,300] {base_task_runner.py:98} INFO - Subtask:   File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/airflow/contrib/hooks/spark_submit_hook.py", line 231, in submit
[2018-08-01 02:08:37,300] {base_task_runner.py:98} INFO - Subtask:     **kwargs)
[2018-08-01 02:08:37,301] {base_task_runner.py:98} INFO - Subtask:   File "/home/ubuntu/anaconda2/lib/python2.7/subprocess.py", line 394, in __init__
[2018-08-01 02:08:37,301] {base_task_runner.py:98} INFO - Subtask:     errread, errwrite)
[2018-08-01 02:08:37,301] {base_task_runner.py:98} INFO - Subtask:   File "/home/ubuntu/anaconda2/lib/python2.7/subprocess.py", line 1047, in _execute_child
[2018-08-01 02:08:37,301] {base_task_runner.py:98} INFO - Subtask:     raise child_exception
[2018-08-01 02:08:37,301] {base_task_runner.py:98} INFO - Subtask: OSError: [Errno 2] No such file or directory

I checked the python script in its path, its there, also checked the mysql driver, its in the jars folder. This error message does not give me much information on which file is missing. Can anyone help me with this?

Upvotes: 1

Views: 3754

Answers (1)

ds_user
ds_user

Reputation: 2179

Answering my own question, since I resolved it. My bad, I just had to restart the airflow webserver and scheduler after placing the driver jars in spark/jar folder. It worked fine then.

Upvotes: 3

Related Questions