SimbaPK
SimbaPK

Reputation: 596

Execute gcloud commands with python subprocess in Airflow task

I want to build Airflow tasks that use multiple gcloud commands. A simple example :

def worker(**kwargs) : 
    exe = subprocess.run(["gcloud", "compute", "instances", "list"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    print(exe.returncode)
    for line in exe.stdout.splitlines() : 
        print(line.decode())

    exe = subprocess.run(["gcloud", "compute", "ssh", "user@host", "--command=pwd"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    print(exe.returncode)
    for line in exe.stdout.splitlines() : 
        print(line.decode())

dag = DAG("TEST", default_args=default_args, schedule_interval=None)


worker_task = PythonOperator(task_id='sample-task', python_callable=worker, provide_context = True, dag=dag)

worker_task

I have this error :

ERROR: gcloud crashed (AttributeError): 'NoneType' object has no attribute 'isatty'

Apart from airflow, these commands work fine.

I've already tried disabling gcloud interactive mode with "--quiet", but that doesn't help.

I don't want to use the GcloudOperator operator from airflow, because these commands must be integrated in a custom operator.

thank you in advance for your help

Upvotes: 0

Views: 1366

Answers (2)

Hussein Awala
Hussein Awala

Reputation: 5110

As I see, your two commands are independent, so you can run them in two separate task from the operator BashOperator, and if you want to access the output of the commands, the output of each one will be available as a xcom, you can read it using ti.xcom_pull(task_ids='<the task id>').

Upvotes: 1

GuziQ
GuziQ

Reputation: 121

Maybe use BashOperator?

worker_task  = BashOperator(task_id="sample-task",bash_command='gcloud compute instances list', dag=dag)

Upvotes: 0

Related Questions