Stefano G.
Stefano G.

Reputation: 309

airflow: how can i put the method for read a json file in a local library

I must generate some dag. I've saved the json table schema files on GCP bucket. The files on the GCP bucket associates to composer will be remapped on /home/airflow/gcs/dags/. If i define the method for read the json file, after the creation of the dag, all goes fine. But if I wish generate some "common code" (for put it on a library of mine), I can't access to FileSystem using the code in the library, in the specific I can't use the python json library.

The strange thing is that, I define the method out of the dag creation step, but I invoke it only after the dag creation!

To complete the discussion, i haven't problems if the code in the library uses only in memory objects.

I've this issue when i work with airflow (1.9 on GCP driver by composer)

This is my external library:

lib/
    __init__.py
    bb_airflow_utils.py

on external library

def load_json_file(fname):
    #per far sì che il dag la veda
    with open(fname, 'r') as f:
        d = json.load(f)
    return d

on principal script


from lib.bb_airflow_utils import *
ROOT_PATH = 'home/airflow/gcs/dags'
IDCLI = 'goofy'
...
...
with DAG(dag_id=dag_name, default_args=dag_args) as dag:
    filepath = path.join(ROOT_PATH, '{}-todwh.json'.format(IDCLI))
    get_data = load_json_file(filepath)
    .....
    task_dummy_start = DummyOperator(task_id='task_{}_start'.format(dag_name), dag=dag)
    .....

Airflow ignore the operator and by UI said that the dag has not SLA

Upvotes: 4

Views: 8580

Answers (1)

kaxil
kaxil

Reputation: 18874

Have a look at https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies#install-local.

You can put common code in a separate file and put it in separate folder like the example below.

Place the dependencies within a subdirectory in the dags/ folder. To import a module from a subdirectory, each subdirectory in the module's path must contain a __init__.py package marker file.

In this example, the dependency is coin_module.py:

dags/
  use_local_deps.py  # A DAG file.
  dependencies/
    __init__.py
    coin_module.py

Import the dependency from the DAG definition file.

For example:

from dependencies import coin_module

Upvotes: 6

Related Questions