Reputation: 37
I am trying to read the file names from GCS bucket recursively from all folders, subfolders under the bucket using a composer DAG. Is it possible? For example i have bucket with corresponding folders and sub folders as mentioned below. static is the bucket name.
static/folder1/subfolder1/file1.json static/folder1/subfolder2/file2.json static/folder1/subfolder3/file3.json static/folder1/subfolder3/file4.json
I want to read the files recursively and put the data in two variables like below.
bucketname = static filepath = static/folder1/subfolder3/file4.json
Upvotes: 1
Views: 2262
Reputation: 837
You can use Airflow's BashOperator
to use the GCS CLI tool (docs here).
An example could be the following:
read_files = BashOperator(
task_id='read_files',
bash_command='gsutil ls -r gs://bucket',
dag=dag,
)
Edit: Since you want to capture the output and the BashOperator
only pushes the last line of stdout to XCom, I would suggest using a PythonOperator
which calls a custom Python callable which uses either the GCS API or even the CLI tool via subprocess
to collect all file names and push it to XCom for subsequent use by downstream tasks. Unless you do not need other tasks to use this data at all, in which case you can process it however you like (not clear from the question).
Upvotes: 2