Reputation: 596
I want to build Airflow tasks that use multiple gcloud commands. A simple example :
def worker(**kwargs) :
exe = subprocess.run(["gcloud", "compute", "instances", "list"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
print(exe.returncode)
for line in exe.stdout.splitlines() :
print(line.decode())
exe = subprocess.run(["gcloud", "compute", "ssh", "user@host", "--command=pwd"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
print(exe.returncode)
for line in exe.stdout.splitlines() :
print(line.decode())
dag = DAG("TEST", default_args=default_args, schedule_interval=None)
worker_task = PythonOperator(task_id='sample-task', python_callable=worker, provide_context = True, dag=dag)
worker_task
I have this error :
ERROR: gcloud crashed (AttributeError): 'NoneType' object has no attribute 'isatty'
Apart from airflow, these commands work fine.
I've already tried disabling gcloud interactive mode with "--quiet", but that doesn't help.
I don't want to use the GcloudOperator operator from airflow, because these commands must be integrated in a custom operator.
thank you in advance for your help
Upvotes: 0
Views: 1366
Reputation: 5110
As I see, your two commands are independent, so you can run them in two separate task from the operator BashOperator
, and if you want to access the output of the commands, the output of each one will be available as a xcom
, you can read it using ti.xcom_pull(task_ids='<the task id>')
.
Upvotes: 1
Reputation: 121
Maybe use BashOperator?
worker_task = BashOperator(task_id="sample-task",bash_command='gcloud compute instances list', dag=dag)
Upvotes: 0