Reputation: 5
I have tried to break this down in several ways since this code snippet comes within an elif loops but it all seems to come down the the AwsLambdaHook giving me trouble. That is, even hardcoding everything down to only the lambda hook and removing the other 'elif' portions does not help. Here is the error I receive in Airflow:
Broken DAG: [/x/y/z/dag.py] Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1267, in set_downstream
self._set_relatives(task_or_task_list, upstream=False)
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1211, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'AwsLambdaHook' object has no attribute 'update_relative'
Using the following code where jobs is retrieved from a config and j is the lambda function/glue job being used in a for loop:
from airflow.providers.amazon.aws.hooks.lambda_function import AwsLambdaHook
[...]
input_job_name = list(jobs[j].keys())[0]
[...]
lambda_step = AwsLambdaHook(function_name = input_job_name,
region_name='us-east-1', log_type='None', qualifier='$LATEST',
invocation_type='RequestResponse', config=None, aws_conn_id='aws_default')
start >> lambda_step >> end
Nowhere in my code do I reference relative upstream/downstream sequences or anything like that. I'm not sure if I need to, and if so, where?
Upvotes: 0
Views: 1239
Reputation: 38992
You're using a hook instead of an operator to declare the lambda_step for your DAG.
There is the AwsLambdaInvokeFunctionOperator
which allows for invoking AWS lambda functions. I strongly recommend going with this approach.
You can write your lambda step as:
from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
lambda_step = AwsLambdaInvokeFunctionOperator(
function_name=input_job_name,
qualifier='$LATEST',
invocation_type='RequestResponse',
)
Another approach is to apply the AwsLambdaHook
in a callback for the PythonOperator
.
from airflow.operators.python import PythonOperator
#...
def invoke_lambda_function(**kwargs):
hook = AwsLambdaHook(aws_conn_id=kwargs.pop('aws_conn_id'))
result = hook.invoke_lambda(**kwargs)
return result
lambda_step = PythonOperator(
python_callable=invoke_lambda_function,
op_kwargs=dict(
function_name=input_job_name,
region_name='us-east-1',
log_type=None,
qualifier='$LATEST',
invocation_type='RequestResponse',
config=None,
aws_conn_id='aws_default'
)
)
Upvotes: 1