Sergey Bakaev Rettley
Sergey Bakaev Rettley

Reputation: 45

Why does airflow or astronomer convert list to ndarray?

Why is the json data type automatically converted to ndarray when using airflow/astronomer?

I have a dag that looks like this:

@dag(
    schedule='@hourly',
    catchup=False,
    default_args=default_args,
    tags=['market', 'items']
)
def market_items():

    @task.external_python(python='/usr/local/airflow/venv_main/bin/python')
    def get_market_items():
        from src.transform import MarketTables
        data = MarketTables().items()
        print([{key: (type(key), type(value))} for i in data['items'].to_dict(orient='records') for key, value in i.items()])
        # >>>  [{'id': (<class 'str'>, <class 'int'>)}, {'uuid': (<class 'str'>, <class 'str'>)}, {'card_id': (<class 'str'>, <class 'int'>)}, {'article': (<class 'str'>, <class 'str'>)}, {'category_id': (<class 'str'>, <class 'int'>)}, {'category_name': (<class 'str'>, <class 'str'>)}, {'brand': (<class 'str'>, <class 'str'>)}, {'name': (<class 'str'>, <class 'str'>)}, {'created_at': (<class 'str'>, <class 'str'>)}, {'updated_at': (<class 'str'>, <class 'str'>)}, {'photos': (<class 'str'>, <class 'list'>)}, {'video': (<class 'str'>, <class 'NoneType'>)}, {'description': (<class 'str'>, <class 'str'>)}, {'length': (<class 'str'>, <class 'int'>)}, {'width': (<class 'str'>, <class 'int'>)}, {'height': (<class 'str'>, <class 'int'>)}, {'link': (<class 'str'>, <class 'str'>)}
        # look at  {'photos': (<class 'str'>, <class 'list'>)}
        return data

    @task.external_python(python='/usr/local/airflow/venv_main/bin/python')
    def update_market_items(data):
        from src.databases.DatabaseWorker import DatabaseWorker
        from src.databases.company.market_schema import MarketItems

        db = DatabaseWorker(host='host.docker.internal', port=5431)
        print([{key: (type(key), type(value))} for i in data['items'].to_dict(orient='records') for key, value in i.items()])

        # >>> {'id': (<class 'str'>, <class 'int'>)}, {'uuid': (<class 'str'>, <class 'str'>)}, {'card_id': (<class 'str'>, <class 'int'>)}, {'article': (<class 'str'>, <class 'str'>)}, {'category_id': (<class 'str'>, <class 'int'>)}, {'category_name': (<class 'str'>, <class 'str'>)}, {'brand': (<class 'str'>, <class 'str'>)}, {'name': (<class 'str'>, <class 'str'>)}, {'created_at': (<class 'str'>, <class 'str'>)}, {'updated_at': (<class 'str'>, <class 'str'>)}, {'photos': (<class 'str'>, <class 'numpy.ndarray'>)}, {'video': (<class 'str'>, <class 'NoneType'>)}, {'description': (<class 'str'>, <class 'str'>)}, {'length': (<class 'str'>, <class 'int'>)}, {'width': (<class 'str'>, <class 'int'>)}, {'height': (<class 'str'>, <class 'int'>)}, {'link': (<class 'str'>, <class 'str'>)}
        # look at {'photos': (<class 'str'>, <class 'numpy.ndarray'>)}
        with db.session() as session:
            session.upsert(
                MarketItems,
                data['items'],
                on_conflict='do_update',
                deletable=True
            )
    _get_market_items = get_market_items()
    _get_market_items >> update_market_items(_get_market_items) >> [
        update_market_items_characteristics(_get_market_items),
        update_market_items_sizes(_get_market_items),
        update_market_items_tags(_get_market_items)
    ]

market_items()

When executing this dag, for some reason, during the transfer of the DataFrame between tasks, json is converted to ndarray, although in fact the data type in the photos column is List[Dict]. At the same time, when I execute the same code without using airflow/astronomer, the data type is not converted to ndarray, but List[Dict] remains, which is then normally inserted into PostgreSQL using SqlAlchemy. Why does this happen and how can it be avoided?

I use pandas==2.2.0 everywhere

Upvotes: 0

Views: 26

Answers (0)

Related Questions