HG K
HG K

Reputation: 327

Can I use GCP Client API in Airflow tasks directly?

I'm using Airflow(GCP Composer) now.

I know it has GCS hook and I can download some GCS files.

But I'd like to read a file partially.

Can I use this python logic with PythonOperator in DAG?

from google.cloud import storage

def my_func():
    client = storage.Client()
    bucket = client.get_bucket("mybucket")
    blob = bucket.get_blob("myfile")
    data = blob.download_as_bytes(end=100)
    return data

In Airflow task, is direct Client API call which is not using hooks forbidden?

Upvotes: 1

Views: 749

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15931

You can but a more Airflowy to handle missing functionality in the hook is to extend the hook:

from airflow.providers.google.cloud.hooks.gcs import GCSHook
class MyGCSHook(GCSHook):

    def download_bytes(
        self,
        bucket_name: str,
        object_name: str,
        end:str,
    ) -> bytes:
        client = self.get_conn()
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(blob_name=object_name)
        return blob.download_as_bytes(end=end)

Then you can use the hook function in PythonOperator or in a custom operator.

To note that GCSHook has download function as you mention. What you may have missed is that if you don't provide filename it will download as bytes (see source code). It doesn't allow to configure the end parameter as you expect but this should be an easy fix to PR for Airflow if you are looking to contributing to Airflow open source.

Upvotes: 2

Related Questions