Hari
Hari

Reputation: 111

How to run a shell script on a Google DataProc cluster using Airflow DataprocOperator

I am trying to run a shell script in the Dataproc cluster once the cluster is setup. I am stuck or not sure what are the parameters to be passed to the operator in order to trigger the .sh file once the cluster is up and running.

Sample Airflow code for creating the cluster:

create_cluster = DataprocClusterCreateOperator(
    task_id='create_dataproc_cluster',
    cluster_name=DAG_CONFIG['DATAPROC']['cluster_name'],
    project_id=DAG_CONFIG['PROJECT_ID'],
    num_workers=DAG_CONFIG['DATAPROC']['num_workers'],
    zone=DAG_CONFIG['DATAPROC']['zone'],
    subnetwork_uri=DAG_CONFIG['DATAPROC']['subnetwork_uri'],
    master_machine_type='n1-standard-1',
    master_disk_type='pd-standard',
    master_disk_size=50,
    worker_machine_type='n1-standard-1',
    worker_disk_type='pd-standard',
    worker_disk_size=50,
    auto_delete_ttl=DAG_CONFIG['DATAPROC']['auto_delete_ttl'],
    storage_bucket=DAG_CONFIG['GCS_STAGING']['bucket_name'],
    dag=DAG_ID)

This is where I need to submit a shell script via DataprocHadoopOperator or whatever operator suits.

Shell_Task = DataProcHadoopOperator(
    task_id='shell_Submit',
    main_jar='???',
    project_id='xxx',
    arguments= [??],
    job_name='{{task.task_id}}_{{ds_nodash}}',
    cluster_name=DAG_CONFIG['DATAPROC']['cluster_name'],
    gcp_conn_id='google_cloud_default',
    region=DAG_CONFIG['DATAPROC']['zone'],
    dag=DAG_ID)

Any help would be appreciated.

Upvotes: 2

Views: 2359

Answers (1)

Igor Dvorzhak
Igor Dvorzhak

Reputation: 4465

To run a shell script on the existing Dataproc cluster using Airflow you can use DataprocSubmitJobOperator to submit Pig Shell job that can execute shell scripts. Note that in this case a shell script will be executed only on the cluster's master node.

To run a shell script on every Dataproc VM during cluster creation you should use Dataproc Initialization actions. You can specify them via DataprocClusterCreateOperator:

DataprocClusterCreateOperator(
    # ...
    init_actions_uris = ['gs://<BUCKET>/path/to/init/action.sh'],
    # ...
)

Upvotes: 0

Related Questions