Reputation: 51
I’m trying execute this dag bellow. It seems that the operator creating a dataproc cluster does not enable enabling the optional components to enable jupyter notebook and anaconda. I found this code here: Component Gateway with DataprocOperator on Airflow to try to solve it, but for me it didn't solve it because i thikn the composer(airflow) version here is diferente. my version is composer - 2.0.0-preview.5, airflow-2.1.4.
The operator works perfectly when creating the cluster, but it doesn't create with the optional component to enable jupyter notebook. Does anyone have any ideas to help me?
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator,DataprocClusterDeleteOperator, DataProcSparkOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
yesterday = datetime.combine(datetime.today() - timedelta(1),
datetime.min.time())
default_args = {
'owner': 'teste3',
'depends_on_past': False,
'start_date' :yesterday,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'teste-dag-3',catchup=False, default_args=default_args, schedule_interval=None)
# configura os componentes
class CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):
def __init__(self, *args, **kwargs):
super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)
def _build_cluster_data(self):
cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
cluster_data['config']['endpointConfig'] = {
'enableHttpPortAccess': True
}
cluster_data['config']['softwareConfig']['optionalComponents'] = [ 'JUPYTER', 'ANACONDA' ]
return cluster_data
create_cluster=CustomDataprocClusterCreateOperator(
dag=dag,
task_id='start_cluster_example',
cluster_name='teste-ge-{{ ds }}',
project_id= "sandbox-coe",
num_workers=2,
num_masters=1,
master_machine_type='n2-standard-8',
worker_machine_type='n2-standard-8',
worker_disk_size=500,
master_disk_size=500,
master_disk_type='pd-ssd',
worker_disk_type='pd-ssd',
image_version='1.5.56-ubuntu18',
tags=['allow-dataproc-internal'],
region="us-central1",
zone='us-central1-f',#Variable.get('gc_zone'),
storage_bucket = "bucket-dataproc-ge",
labels = {'product' : 'sample-label'},
service_account_scopes = ['https://www.googleapis.com/auth/cloud-platform'],
#properties={"yarn:yarn.nodemanager.resource.memory-mb" : 15360,"yarn:yarn.scheduler.maximum-allocation-mb" : 15360},
#subnetwork_uri="projects/project-id/regions/us-central1/subnetworks/dataproc-subnet",
retries= 1,
retry_delay=timedelta(minutes=1)
) #starts a dataproc cluster
stop_cluster_example = DataprocClusterDeleteOperator(
dag=dag,
task_id='stop_cluster_example',
cluster_name='teste-ge-{{ ds }}',
project_id="sandbox-coe",
region="us-central1",
) #stops a running dataproc cluster
create_cluster >> stop_cluster_example
Upvotes: 5
Views: 1017
Reputation: 16139
Edit:
After took a deeper look you don't need a custom operator any more.
The updated operator DataprocCreateClusterOperator
has enable_component_gateway
and optional_components
so you can just set them directly:
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator, DataprocCreateClusterOperator
CLUSTER_GENERATOR = ClusterGenerator(
project_id=PROJECT_ID,
region=REGION,
...,
enable_component_gateway=True,
optional_components = [ 'JUPYTER', 'ANACONDA' ]
).make()
DataprocCreateClusterOperator(
...,
cluster_config=CLUSTER_GENERATOR
)
You can check this example dag for more details.
You can view all possible parameters of ClusterGenerator
in the source code.
Original Answer:
The operator was re-written (see PR). I think the issue is with your _build_cluster_data
function.
You probably should change your code to:
def _build_cluster_data(self):
cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
cluster_data['config']['endpoint_config'] = {
'enableHttpPortAccess': True
}
cluster_data['config']['software_config']['optional_components'] = [ 'JUPYTER', 'ANACONDA' ] # redundant see comment 2
return cluster_data
A few notes:
CustomDataprocClusterCreateOperator is deprecated. You should use DataprocCreateClusterOperator
from the google provider.
You don't need to have cluster_data['config']['endpoint_config']
you can set the value directly by passing optional_components
to the operator with see source code.
Upvotes: 2