Reputation: 65
I want to be able to pass the glue arguments in the airflow instead of script. I am trying like below but it doesn't work:
Upvotes: 2
Views: 3373
Reputation: 15931
The error you shared indicates that you are running older version of the Amazon provider.
For this to work you must have apache-airflow-providers-amazon>=2.3.0
.
Usage Example:
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
some_run_kwargs = {"NumberOfWorkers": 5}
some_script_arguments = {"--var1": "value"}
glue_job_hook = GlueJobHook(
job_name='aws_test_glue_job',
desc='This is test case job from Airflow',
iam_role_name='my_test_role',
script_location="s3:/glue-examples/glue-scripts/sample_aws_glue_job.py",
s3_bucket="my-includes",
region_name="us-west-2",
)
glue_job_run = glue_job_hook.initialize_job(
script_arguments=some_script_arguments,
run_kwargs=some_run_kwargs
)
If you are using apache-airflow-providers-amazon<2.3.0
you can create a custom hook by backporting the code added in PR:
from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
class MyGlueJobHook(AwsGlueJobHook):
def initialize_job(
self,
script_arguments: Optional[dict] = None,
run_kwargs: Optional[dict] = None,
) -> Dict[str, str]:
"""
Initializes connection with AWS Glue
to run job
:return:
"""
glue_client = self.get_conn()
script_arguments = script_arguments or {}
run_kwargs = run_kwargs or {}
try:
job_name = self.get_or_create_glue_job()
job_run = glue_client.start_job_run(JobName=job_name, Arguments=script_arguments, **run_kwargs)
return job_run
except Exception as general_error:
self.log.error("Failed to run aws glue job, error: %s", general_error)
raise
Then you can use MyGlueJobHook
as above.
Upvotes: 4