ariana20
ariana20

Reputation: 5

Python/Airflow AttributeError: 'AwsLambdaHook' object has no attribute 'update_relative'

I have tried to break this down in several ways since this code snippet comes within an elif loops but it all seems to come down the the AwsLambdaHook giving me trouble. That is, even hardcoding everything down to only the lambda hook and removing the other 'elif' portions does not help. Here is the error I receive in Airflow:

Broken DAG: [/x/y/z/dag.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1267, in set_downstream
    self._set_relatives(task_or_task_list, upstream=False)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1211, in _set_relatives
    task_object.update_relative(self, not upstream)
AttributeError: 'AwsLambdaHook' object has no attribute 'update_relative'

Using the following code where jobs is retrieved from a config and j is the lambda function/glue job being used in a for loop:

from airflow.providers.amazon.aws.hooks.lambda_function import AwsLambdaHook
[...]
input_job_name = list(jobs[j].keys())[0]
[...]
lambda_step = AwsLambdaHook(function_name = input_job_name,
                    region_name='us-east-1', log_type='None', qualifier='$LATEST',
                    invocation_type='RequestResponse', config=None, aws_conn_id='aws_default')
            start >> lambda_step >> end

Nowhere in my code do I reference relative upstream/downstream sequences or anything like that. I'm not sure if I need to, and if so, where?

Upvotes: 0

Views: 1239

Answers (1)

Oluwafemi Sule
Oluwafemi Sule

Reputation: 38992

You're using a hook instead of an operator to declare the lambda_step for your DAG.

There is the AwsLambdaInvokeFunctionOperator which allows for invoking AWS lambda functions. I strongly recommend going with this approach.

You can write your lambda step as:

from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator

lambda_step = AwsLambdaInvokeFunctionOperator(
    function_name=input_job_name,
    qualifier='$LATEST',
    invocation_type='RequestResponse',
)

Another approach is to apply the AwsLambdaHook in a callback for the PythonOperator.

from airflow.operators.python import PythonOperator

#...

def invoke_lambda_function(**kwargs):
    hook = AwsLambdaHook(aws_conn_id=kwargs.pop('aws_conn_id'))
    result = hook.invoke_lambda(**kwargs)
    return result

lambda_step = PythonOperator(
    python_callable=invoke_lambda_function,
    op_kwargs=dict(
        function_name=input_job_name,
        region_name='us-east-1',
        log_type=None,
        qualifier='$LATEST',
        invocation_type='RequestResponse',
        config=None,
        aws_conn_id='aws_default'
    )
)

Upvotes: 1

Related Questions