poiuyqwerty0910
poiuyqwerty0910

Reputation: 65

Airflow glue job

I want to be able to pass the glue arguments in the airflow instead of script. I am trying like below but it doesn't work:

Upvotes: 2

Views: 3373

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15931

The error you shared indicates that you are running older version of the Amazon provider.

For this to work you must have apache-airflow-providers-amazon>=2.3.0.

Usage Example:

from airflow.providers.amazon.aws.hooks.glue import GlueJobHook

some_run_kwargs = {"NumberOfWorkers": 5}
some_script_arguments = {"--var1": "value"}
glue_job_hook = GlueJobHook(
    job_name='aws_test_glue_job',
    desc='This is test case job from Airflow',
    iam_role_name='my_test_role',
    script_location="s3:/glue-examples/glue-scripts/sample_aws_glue_job.py",
    s3_bucket="my-includes",
    region_name="us-west-2",
)
glue_job_run = glue_job_hook.initialize_job(
    script_arguments=some_script_arguments,
    run_kwargs=some_run_kwargs
)

If you are using apache-airflow-providers-amazon<2.3.0 you can create a custom hook by backporting the code added in PR:

from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook

class MyGlueJobHook(AwsGlueJobHook):
    
    def initialize_job(
        self,
        script_arguments: Optional[dict] = None,
        run_kwargs: Optional[dict] = None,
    ) -> Dict[str, str]:
        """
        Initializes connection with AWS Glue
        to run job
        :return:
        """
        glue_client = self.get_conn()
        script_arguments = script_arguments or {}
        run_kwargs = run_kwargs or {}

        try:
            job_name = self.get_or_create_glue_job()
            job_run = glue_client.start_job_run(JobName=job_name, Arguments=script_arguments, **run_kwargs)
            return job_run
        except Exception as general_error:
            self.log.error("Failed to run aws glue job, error: %s", general_error)
            raise

Then you can use MyGlueJobHook as above.

Upvotes: 4

Related Questions