cpatino08
cpatino08

Reputation: 105

Airflow Process With an Extremely High computation step

I have step involving a large follow of data that needs to be done in Python (at the moment using pandas). I was just curious on suggesting one, to make sure I have enough resources to execute large data manipulation, for several different client configurations, and two how to make this process more efficient (i.e. using Pyspark or other tools, new to these so bear with any follow up questions). Thanks for the help I'll try and add more details if need just want to keep this fairly high level.

Pipeline currently is a few PythonOps, a few BashOperators, and a few BigQuery operators (running through GCP Composer)

Upvotes: 1

Views: 699

Answers (1)

Sandeep Mohanty
Sandeep Mohanty

Reputation: 1552

Cloud Composer is used to schedule pipelines.

For First query:

Depending upon the resources that you are using, if multiple tasks are running in a DAG, and if the number of operators are more, then you need to configure your Composer Environment to meet the criteria.

  • The number of node counts can be increased depending upon the task instances.
  • As multiple tasks can be scheduled in a single DAG, the machine types can be changed to workload optimized machines and disk size also to minimize slowing down of the environment.

For second query:

PySpark supports Apache Spark and it works with large sets of data by processing them in parallel and batch systems. So, Pyspark can be used to increase the efficiency of data processing.

Thus for running PySpark code in Cloud Composer you need to create a Dataproc Cluster as PySpark jobs run in Dataproc clusters. In the DAG, using DataprocCreateClusterOperator you can schedule to create a Dataproc Cluster. After the Cluster is created, you can submit your PySpark job to the Dataproc cluster using the DataprocSubmitJobOperator. To submit a job to the cluster you need to provide a job source file. You can refer to the below piece of code for reference.

PySpark Job code:


import pyspark
from operator import add
sc = pyspark.SparkContext()

data = sc.parallelize(list("Hello World"))
counts = data.map(lambda x: 
    (x, 1)).reduceByKey(add).sortBy(lambda x: x[1],
     ascending=False).collect()

for (word, count) in counts:
    print("{}: {}".format(word, count))


DAG code :


import os
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
   DataprocCreateClusterOperator,
   DataprocSubmitJobOperator
)
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
from airflow.utils.dates import days_ago

PROJECT_ID = "give your project id"
CLUSTER_NAME =  "your dataproc cluster name that you want to create"
REGION = "us-central1"
ZONE = "us-central1-a"
PYSPARK_URI = "GCS location of your PySpark Code i.e gs://[input file]"

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_dag_args = {
    'start_date': YESTERDAY,
}

# Cluster definition
# [START how_to_cloud_dataproc_create_cluster]

CLUSTER_CONFIG = {
   "master_config": {
       "num_instances": 1,
       "machine_type_uri": "n1-standard-4",
       "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
   },
   "worker_config": {
       "num_instances": 2,
       "machine_type_uri": "n1-standard-4",
       "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
   },
}
with models.DAG(
   "dataproc",

   schedule_interval=datetime.timedelta(days=1),
   default_args=default_dag_args) as dag:

   # [START how_to_cloud_dataproc_create_cluster_operator]
   create_cluster = DataprocCreateClusterOperator(
       task_id="create_cluster",
       project_id=PROJECT_ID,
       cluster_config=CLUSTER_CONFIG,
       region=REGION,
       cluster_name=CLUSTER_NAME,
   )
   PYSPARK_JOB = {
   "reference": {"project_id": PROJECT_ID},
   "placement": {"cluster_name": CLUSTER_NAME},
   "pyspark_job": {"main_python_file_uri": PYSPARK_URI},
   }

   pyspark_task = DataprocSubmitJobOperator(
       task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
   )

   create_cluster >>  pyspark_task

Dataflow can also be used for data processing for both batch and streaming data.

Upvotes: 1

Related Questions