seve
seve

Reputation: 185

Cannot get custom MySQLOperator to work in Airflow: extra_dejson error with hook

I'm trying to write an operator that will download some API data and place it into a table using a dataframe. I've got the following operator code written up:

from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from typing import List
from pycoingecko import CoinGeckoAPI
import pandas as pd


class CryptoToMySql(BaseOperator):
    @apply_defaults
    def __init__(
            self,
            name: str,
            coins: List[str],
            mysql_conn_id: str = None,
            tablename: str = None,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.name = name
        self.coins = coins
        self.mysql_conn_id = mysql_conn_id
        self.tablename = tablename

    # TODO: Test this to see if it can return data
    @staticmethod
    def _get_cryptos(coins):
        cg = CoinGeckoAPI()
        data = cg.get_price(ids=coins, vs_currencies='usd', include_market_cap=True, include_24hr_vol=True,
                            include_24hr_change=True, include_last_updated_at=True)

        df = pd.DataFrame.from_dict(data, orient='index').reset_index()
        df.rename(columns={'index': 'crypto'}, inplace=True)
        return df

    def execute(self, context):
        hook = MySqlHook(schema='source', connection=self.mysql_conn_id)
        conn = hook.get_conn()
        data = self._get_cryptos(self.coins)

        data.to_sql(self.tablename, conn, if_exists='append', index=False)

        message = f" Saving data to {self.tablename}"
        print(message)
        return message

and the following DAG:

from datetime import timedelta, datetime
from airflow import DAG
from operators.operators import CryptoToMySql
from airflow.operators.dummy import DummyOperator

coins = ['bitcoin', 'litecoin', 'ethereum', 'dogecoin']

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

with DAG(
    'crypt dag',
    default_args=default_args,
    description='Pulls various crypto prices every interval',
    schedule_interval='@hourly',
    start_date=(datetime(2021, 5, 9)),
    tags=['crypto']
) as dag:
    t1 = DummyOperator(
        task_id='dummy-1'
    )

    t2 = CryptoToMySql(
        task_id='load_data',
        name='crypto_task',
        coins=coins,
        mysql_conn_id='pinwheel_source',
        tablename='stonks'
    )

    t1 >> t2

However I get the following error at the hook = get_conn() call:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.9/site-packages/airflow/providers/mysql/hooks/mysql.py", line 162, in get_conn
    client_name = conn.extra_dejson.get('client', 'mysqlclient')
AttributeError: 'str' object has no attribute 'extra_dejson'

Further research into the source code of the MySqlHook class reveals the following line where the issue arises:

        conn = self.connection or self.get_connection(
            getattr(self, self.conn_name_attr)
        )  # pylint: disable=no-member

        client_name = conn.extra_dejson.get('client', 'mysqlclient')

The extra_dejson method tries to unpack a json connection string to get the data. However, no where is this described in the documentation and the MySqlHook code simply expects the conn_id to be passed as described here and here and here

I've confirmed that my connection being passed exists in the DB and that the AIRFLOW_HOME variable is pointing to the correct directory.

The connection:


id | conn_id    | conn_type | descriptio | host      | schema | login    | password   | port | is_encrypt | is_extra_e | extra_dejs | get_uri   
   |            |           | n          |           |        |          |            |      | ed         | ncrypted   | on         |           
===+============+===========+============+===========+========+==========+============+======+============+============+============+===========
48 | pinwheel_source | mysql     | connection | localhost | source | pinwheel | |          | 3306 | True       | True       | {}         |           

Upvotes: 2

Views: 2626

Answers (2)

seve
seve

Reputation: 185

To anyone else whoever has this issue, this is what I found.

The docs for the MySQLHook class here reference that the parameter you use for the connection is connection. However this doesn't work.

hook = MySqlHook(schema='source', connection={'connection': 'mysql_pinwheel_source'})

>>> hook.get_conn()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.9/site-packages/airflow/providers/mysql/hooks/mysql.py", line 162, in get_conn
    client_name = conn.extra_dejson.get('client', 'mysqlclient')
AttributeError: 'str' object has no attribute 'extra_dejson'

The correct parameter name to pass is mysql_conn_id:

>>> hook = MySqlHook(schema='source', mysql_conn_id='mysql_pinwheel_source')
>>> hook.get_conn()
[2021-05-09 10:30:40,293] {base.py:65} INFO - Using connection to: id: mysql_pinwheel_source. Host: localhost, Port: 3306, Schema: source, Login: pinwheel, Password: XXXXXXXX, extra: XXXXXXXX
<_mysql.connection open to 'localhost' at 0x562af438f760>

I'm not sure if there is some other parameter in order to use the connection variable but this seems to be the solution.

Upvotes: 3

Elad Kalif
Elad Kalif

Reputation: 15961

When using MySQL you are given a choice of which python wrapper you want to use. You can use mysql-connector-python or mysqlclient. It is mentioned in the docs that you need to specify the client to connect in the Extra field.

So just add {"client": "mysql-connector-python"} or {"client": "mysqlclient"} to the extra field.

Note that a good source for information is the tests. For example check this

Upvotes: 0

Related Questions