Reputation: 111
I am trying to run a shell script in the Dataproc cluster once the cluster is setup. I am stuck or not sure what are the parameters to be passed to the operator in order to trigger the .sh file once the cluster is up and running.
Sample Airflow code for creating the cluster:
create_cluster = DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name=DAG_CONFIG['DATAPROC']['cluster_name'],
project_id=DAG_CONFIG['PROJECT_ID'],
num_workers=DAG_CONFIG['DATAPROC']['num_workers'],
zone=DAG_CONFIG['DATAPROC']['zone'],
subnetwork_uri=DAG_CONFIG['DATAPROC']['subnetwork_uri'],
master_machine_type='n1-standard-1',
master_disk_type='pd-standard',
master_disk_size=50,
worker_machine_type='n1-standard-1',
worker_disk_type='pd-standard',
worker_disk_size=50,
auto_delete_ttl=DAG_CONFIG['DATAPROC']['auto_delete_ttl'],
storage_bucket=DAG_CONFIG['GCS_STAGING']['bucket_name'],
dag=DAG_ID)
This is where I need to submit a shell script via DataprocHadoopOperator or whatever operator suits.
Shell_Task = DataProcHadoopOperator(
task_id='shell_Submit',
main_jar='???',
project_id='xxx',
arguments= [??],
job_name='{{task.task_id}}_{{ds_nodash}}',
cluster_name=DAG_CONFIG['DATAPROC']['cluster_name'],
gcp_conn_id='google_cloud_default',
region=DAG_CONFIG['DATAPROC']['zone'],
dag=DAG_ID)
Any help would be appreciated.
Upvotes: 2
Views: 2359
Reputation: 4465
To run a shell script on the existing Dataproc cluster using Airflow you can use DataprocSubmitJobOperator to submit Pig Shell job that can execute shell scripts. Note that in this case a shell script will be executed only on the cluster's master node.
To run a shell script on every Dataproc VM during cluster creation you should use Dataproc Initialization actions. You can specify them via DataprocClusterCreateOperator:
DataprocClusterCreateOperator(
# ...
init_actions_uris = ['gs://<BUCKET>/path/to/init/action.sh'],
# ...
)
Upvotes: 0