Reputation: 99
I am trying to trigger dbt cloud via AirFlow and unable to find an answer to the error below
ERROR - run_job() got an unexpected keyword argument 'conf'
I am using the plugin code from https://github.com/sungchun12/airflow-dbt-cloud
"""Main handler method to run the dbt Cloud job and track the job run status"""
job_run_id = self._trigger_job()
print(f"job_run_id = {job_run_id}")
visit_url = f"https://cloud.getdbt.com/#/accounts/{self.account_id}/projects/{self.project_id}/runs/{job_run_id}/"
print(f"Check the dbt Cloud job status! Visit URL:{visit_url}")
while True:
time.sleep(1) # make an api call every 1 second
job_run_status = self._get_job_run_status(job_run_id)
print(f"job_run_status = {job_run_status}")
if job_run_status == dbt_job_run_status.SUCCESS:
print(f"Success! Visit URL: {visit_url}")
break
elif (
job_run_status == dbt_job_run_status.ERROR
or job_run_status == dbt_job_run_status.CANCELLED
):
raise Exception(f"Failure! Visit URL: {visit_url}")
I am calling the function via the following code
run_dbt_cloud_job = PythonOperator(
task_id="run_dbt_cloud_job",
python_callable=dbt_cloud_job_runner_config.run_job,
provide_context=True,
)
And get the error
run_job() got an unexpected keyword argument 'conf'
I'm not sure why it is not working.
If I change the provide context = False then I get following error
ERROR - 'NoneType' object is not subscriptable
Upvotes: 2
Views: 2721
Reputation: 1780
In this case, you need to pass **kwargs
on the parameters.
You can see this example:
def my_mult_function(number, **kwargs):
return number*number
It is because if you set provide_context=True
, the PythonOperator
will export context to make available for callables to use. A generic variable will catch all the keywords arguments.
You can see the PythonOperator class.
If your code returns this error “'NoneType' object is not subscriptable”
, it is because there are no results in your query. A solution to this is to change the query to use scalar().
You can see this example.
triggered_by = (
session.query(Log.owner)
.filter(Log.dag_id == "killer_dag")
.limit(1)
.scalar()
)
You can see more documentation in this link.
Upvotes: 1