Reputation: 185
I'm trying to write an operator that will download some API data and place it into a table using a dataframe. I've got the following operator code written up:
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from typing import List
from pycoingecko import CoinGeckoAPI
import pandas as pd
class CryptoToMySql(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
coins: List[str],
mysql_conn_id: str = None,
tablename: str = None,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.name = name
self.coins = coins
self.mysql_conn_id = mysql_conn_id
self.tablename = tablename
# TODO: Test this to see if it can return data
@staticmethod
def _get_cryptos(coins):
cg = CoinGeckoAPI()
data = cg.get_price(ids=coins, vs_currencies='usd', include_market_cap=True, include_24hr_vol=True,
include_24hr_change=True, include_last_updated_at=True)
df = pd.DataFrame.from_dict(data, orient='index').reset_index()
df.rename(columns={'index': 'crypto'}, inplace=True)
return df
def execute(self, context):
hook = MySqlHook(schema='source', connection=self.mysql_conn_id)
conn = hook.get_conn()
data = self._get_cryptos(self.coins)
data.to_sql(self.tablename, conn, if_exists='append', index=False)
message = f" Saving data to {self.tablename}"
print(message)
return message
and the following DAG:
from datetime import timedelta, datetime
from airflow import DAG
from operators.operators import CryptoToMySql
from airflow.operators.dummy import DummyOperator
coins = ['bitcoin', 'litecoin', 'ethereum', 'dogecoin']
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
with DAG(
'crypt dag',
default_args=default_args,
description='Pulls various crypto prices every interval',
schedule_interval='@hourly',
start_date=(datetime(2021, 5, 9)),
tags=['crypto']
) as dag:
t1 = DummyOperator(
task_id='dummy-1'
)
t2 = CryptoToMySql(
task_id='load_data',
name='crypto_task',
coins=coins,
mysql_conn_id='pinwheel_source',
tablename='stonks'
)
t1 >> t2
However I get the following error at the hook = get_conn()
call:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.9/site-packages/airflow/providers/mysql/hooks/mysql.py", line 162, in get_conn
client_name = conn.extra_dejson.get('client', 'mysqlclient')
AttributeError: 'str' object has no attribute 'extra_dejson'
Further research into the source code of the MySqlHook class reveals the following line where the issue arises:
conn = self.connection or self.get_connection(
getattr(self, self.conn_name_attr)
) # pylint: disable=no-member
client_name = conn.extra_dejson.get('client', 'mysqlclient')
The extra_dejson method tries to unpack a json connection string to get the data. However, no where is this described in the documentation and the MySqlHook code simply expects the conn_id to be passed as described here and here and here
I've confirmed that my connection being passed exists in the DB and that the AIRFLOW_HOME
variable is pointing to the correct directory.
The connection:
id | conn_id | conn_type | descriptio | host | schema | login | password | port | is_encrypt | is_extra_e | extra_dejs | get_uri
| | | n | | | | | | ed | ncrypted | on |
===+============+===========+============+===========+========+==========+============+======+============+============+============+===========
48 | pinwheel_source | mysql | connection | localhost | source | pinwheel | | | 3306 | True | True | {} |
Upvotes: 2
Views: 2626
Reputation: 185
To anyone else whoever has this issue, this is what I found.
The docs for the MySQLHook class here reference that the parameter you use for the connection is connection
. However this doesn't work.
hook = MySqlHook(schema='source', connection={'connection': 'mysql_pinwheel_source'})
>>> hook.get_conn()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.9/site-packages/airflow/providers/mysql/hooks/mysql.py", line 162, in get_conn
client_name = conn.extra_dejson.get('client', 'mysqlclient')
AttributeError: 'str' object has no attribute 'extra_dejson'
The correct parameter name to pass is mysql_conn_id
:
>>> hook = MySqlHook(schema='source', mysql_conn_id='mysql_pinwheel_source')
>>> hook.get_conn()
[2021-05-09 10:30:40,293] {base.py:65} INFO - Using connection to: id: mysql_pinwheel_source. Host: localhost, Port: 3306, Schema: source, Login: pinwheel, Password: XXXXXXXX, extra: XXXXXXXX
<_mysql.connection open to 'localhost' at 0x562af438f760>
I'm not sure if there is some other parameter in order to use the connection
variable but this seems to be the solution.
Upvotes: 3
Reputation: 15961
When using MySQL you are given a choice of which python wrapper you want to use. You can use mysql-connector-python
or mysqlclient
. It is mentioned in the docs that you need to specify the client to connect in the Extra
field.
So just add {"client": "mysql-connector-python"}
or {"client": "mysqlclient"}
to the extra field.
Note that a good source for information is the tests. For example check this
Upvotes: 0