Reputation: 327
I'm using Airflow(GCP Composer) now.
I know it has GCS hook and I can download some GCS files.
But I'd like to read a file partially.
Can I use this python logic with PythonOperator in DAG?
from google.cloud import storage
def my_func():
client = storage.Client()
bucket = client.get_bucket("mybucket")
blob = bucket.get_blob("myfile")
data = blob.download_as_bytes(end=100)
return data
In Airflow task, is direct Client API call which is not using hooks forbidden?
Upvotes: 1
Views: 749
Reputation: 15931
You can but a more Airflowy to handle missing functionality in the hook is to extend the hook:
from airflow.providers.google.cloud.hooks.gcs import GCSHook
class MyGCSHook(GCSHook):
def download_bytes(
self,
bucket_name: str,
object_name: str,
end:str,
) -> bytes:
client = self.get_conn()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name=object_name)
return blob.download_as_bytes(end=end)
Then you can use the hook function in PythonOperator or in a custom operator.
To note that GCSHook
has download function as you mention.
What you may have missed is that if you don't provide filename it will download as bytes (see source code). It doesn't allow to configure the end
parameter as you expect but this should be an easy fix to PR for Airflow if you are looking to contributing to Airflow open source.
Upvotes: 2