Component Gateway activation on dataproc does not work with composer(airflow) operator airflow.providers.google.cloud.operators.dataproc

I’m trying execute this dag bellow. It seems that the operator creating a dataproc cluster does not enable enabling the optional components to enable jupyter notebook and anaconda. I found this code here: Component Gateway with DataprocOperator on Airflow to try to solve it, but for me it didn't solve it because i thikn the composer(airflow) version here is diferente. my version is composer - 2.0.0-preview.5, airflow-2.1.4.

The operator works perfectly when creating the cluster, but it doesn't create with the optional component to enable jupyter notebook. Does anyone have any ideas to help me?

from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator,DataprocClusterDeleteOperator, DataProcSparkOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator

yesterday = datetime.combine(datetime.today() - timedelta(1),
                             datetime.min.time())


default_args = {
    'owner': 'teste3',
    'depends_on_past': False,
    'start_date' :yesterday,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),

}

dag = DAG(
    'teste-dag-3',catchup=False, default_args=default_args, schedule_interval=None)


# configura os componentes
class CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):

    def __init__(self, *args, **kwargs):
        super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)

    def _build_cluster_data(self):
        cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
        cluster_data['config']['endpointConfig'] = {
            'enableHttpPortAccess': True
        }
        cluster_data['config']['softwareConfig']['optionalComponents'] = [ 'JUPYTER', 'ANACONDA' ]
        return cluster_data


create_cluster=CustomDataprocClusterCreateOperator(
        dag=dag,
        task_id='start_cluster_example',
        cluster_name='teste-ge-{{ ds }}',
        project_id= "sandbox-coe",
        num_workers=2,
        num_masters=1,
        master_machine_type='n2-standard-8',
        worker_machine_type='n2-standard-8',
        worker_disk_size=500,
        master_disk_size=500,
        master_disk_type='pd-ssd',
        worker_disk_type='pd-ssd',
        image_version='1.5.56-ubuntu18',
        tags=['allow-dataproc-internal'],
        region="us-central1",
        zone='us-central1-f',#Variable.get('gc_zone'),
        storage_bucket = "bucket-dataproc-ge",
        labels = {'product' : 'sample-label'},
        service_account_scopes = ['https://www.googleapis.com/auth/cloud-platform'],
        #properties={"yarn:yarn.nodemanager.resource.memory-mb" : 15360,"yarn:yarn.scheduler.maximum-allocation-mb" : 15360},
        #subnetwork_uri="projects/project-id/regions/us-central1/subnetworks/dataproc-subnet",
        retries= 1,
        retry_delay=timedelta(minutes=1)
    ) #starts a dataproc cluster


stop_cluster_example = DataprocClusterDeleteOperator(
    dag=dag,
    task_id='stop_cluster_example',
    cluster_name='teste-ge-{{ ds }}',
    project_id="sandbox-coe",
    region="us-central1",
    ) #stops a running dataproc cluster




create_cluster  >> stop_cluster_example

Upvotes: 5

Views: 1017

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 16139

Edit: After took a deeper look you don't need a custom operator any more. The updated operator DataprocCreateClusterOperator has enable_component_gateway and optional_components so you can just set them directly:

from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator, DataprocCreateClusterOperator

CLUSTER_GENERATOR = ClusterGenerator(
    project_id=PROJECT_ID,
    region=REGION,
    ...,
    enable_component_gateway=True,
    optional_components = [ 'JUPYTER', 'ANACONDA' ]
).make()

DataprocCreateClusterOperator(
    ...,
    cluster_config=CLUSTER_GENERATOR
)

You can check this example dag for more details. You can view all possible parameters of ClusterGenerator in the source code.

Original Answer: The operator was re-written (see PR). I think the issue is with your _build_cluster_data function.

You probably should change your code to:

def _build_cluster_data(self):
    cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
    cluster_data['config']['endpoint_config'] = {
        'enableHttpPortAccess': True
    }
    cluster_data['config']['software_config']['optional_components'] = [ 'JUPYTER', 'ANACONDA' ] # redundant see comment 2
    return cluster_data

A few notes:

  1. CustomDataprocClusterCreateOperator is deprecated. You should use DataprocCreateClusterOperator from the google provider.

  2. You don't need to have cluster_data['config']['endpoint_config'] you can set the value directly by passing optional_components to the operator with see source code.

Upvotes: 2

Related Questions