Reputation: 764
Info:
The exception traceback is as follows.
Exception ignored in: <bound method Connection.__del__ of <aiomysql.connection.Connection object at 0x00000030F8080B38>>
Traceback (most recent call last):
File "C:\software\development\python3.5\lib\site-packages\aiomysql\connection.py", line 689, in __del__
File "C:\software\development\python3.5\lib\site-packages\aiomysql\connection.py", line 261, in close
File "C:\software\development\python3.5\lib\asyncio\selector_events.py", line 569, in close
File "C:\software\development\python3.5\lib\asyncio\base_events.py", line 447, in call_soon
File "C:\software\development\python3.5\lib\asyncio\base_events.py", line 456, in _call_soon
File "C:\software\development\python3.5\lib\asyncio\base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed
I implement a simple ORM framework with some functions to process SQL.Some related codes in orm.py(just ignore the chinese annotation) as follows. The update or findAll method in class Model works well and really give the results correctly but every time after I running the test_method,it gives the exception.
@asyncio.coroutine
def create_pool(loop, **kw): # 引入关键字后不用显示import asyncio了
# 该函数用于创建连接池
global __pool # 全局变量用于保存连接池
__pool = yield from aiomysql.create_pool(
host=kw.get('host', 'localhost'), # 默认定义host名字为localhost
port=kw.get('port', 3306), # 默认定义mysql的默认端口是3306
user=kw['user'], # user是通过关键字参数传进来的
password=kw['password'], # 密码也是通过关键字参数传进来的
db=kw['db'], # 数据库的名字
charset=kw.get('charset', 'utf8'), # 默认数据库字符集是utf8
autocommit=kw.get('autocommit', True), # 默认自动提交事务
maxsize=kw.get('maxsize', 10), # 连接池最多同时处理10个请求
minsize=kw.get('minsize', 1), # 连接池最少1个请求
loop=loop # 传递消息循环对象loop用于异步执行
)
@asyncio.coroutine
def execute(sql, args, autocommit=True):
# execute方法只返回结果数,不返回结果集,用于insert,update这些SQL语句
log(sql)
with (yield from __pool) as conn:
if not autocommit:
yield from conn.begin()
try:
cur = yield from conn.cursor()
# 执行sql语句,同时替换占位符
# pdb.set_trace()
yield from cur.execute(sql.replace('?', '%s'), args)
affected = cur.rowcount # 返回受影响的行数
yield from cur.close() # 关闭游标
if not autocommit:
yield from conn.commit()
except BaseException as e:
if not autocommit:
yield from conn.rollback()
raise e # raise不带参数,则把此处的错误往上抛;为了方便理解还是建议加e吧
return affected
class Model(dict, metaclass=ModelMetaclass):
# 继承dict是为了使用方便,例如对象实例user['id']即可轻松通过UserModel去数据库获取到id
# 元类自然是为了封装我们之前写的具体的SQL处理函数,从数据库获取数据
def __init__(self, **kw):
# 调用dict的父类__init__方法用于创建Model,super(类名,类对象)
super(Model, self).__init__(**kw)
def __getattr__(self, key):
# 调用不存在的属性时返回一些内容
try:
return self[key] # 如果存在则正常返回
except KeyError:
raise AttributeError(
r"'Model' object has no attribute '%s'" % key) # r表示不转义
def __setattr__(self, key, value):
# 设定Model里面的key-value对象,这里value允许为None
self[key] = value
def getValue(self, key):
# 获取某个具体的值,肯定存在的情况下使用该函数,否则会使用__getattr()__
# 获取实例的key,None是默认值,getattr方法使用可以参考http://kaimingwan.com/post/python/pythonzhong-de-nei-zhi-han-shu-getattr-yu-fan-she
return getattr(self, key, None)
def getValueOrDefault(self, key):
# 这个方法当value为None的时候能够返回默认值
value = getattr(self, key, None)
if value is None: # 不存在这样的值则直接返回
# self.__mapping__在metaclass中,用于保存不同实例属性在Model基类中的映射关系
field = self.__mappings__[key]
if field.default is not None: # 如果实例的域存在默认值,则使用默认值
# field.default是callable的话则直接调用
value = field.default() if callable(field.default) else field.default
logging.debug('using default value for %s:%s' %
(key, str(value)))
setattr(self, key, value)
return value
# --------------------------每个Model类的子类实例应该具备的执行SQL的方法比如save------
@classmethod # 类方法
@asyncio.coroutine
def findAll(cls, where=None, args=None, **kw):
sql = [cls.__select__] # 获取默认的select语句
if where: # 如果有where语句,则修改sql变量
# 这里不用协程,是因为不需要等待数据返回
sql.append('where') # sql里面加上where关键字
sql.append(where) # 这里的where实际上是colName='xxx'这样的条件表达式
if args is None: # 什么参数?
args = []
orderBy = kw.get('orderBy', None) # 从kw中查看是否有orderBy属性
if orderBy:
sql.append('order by')
sql.append(orderBy)
limit = kw.get('limit', None) # mysql中可以使用limit关键字
if limit is not None:
sql.append('limit')
if isinstance(limit, int): # 如果是int类型则增加占位符
sql.append('?')
args.append(limit)
elif isinstance(limit, tuple) and len(limit) == 2: # limit可以取2个参数,表示一个范围
sql.append('?,?')
args.extend(limit)
else: # 其他情况自然是语法问题
raise ValueError('Invalid limit value: %s' % str(limit))
# 在原来默认SQL语句后面再添加语句,要加个空格
rs = yield from select(' '.join(sql), args)
return [cls(**r) for r in rs] # 返回结果,结果是list对象,里面的元素是dict类型的
@classmethod
@asyncio.coroutine
def findNumber(cls, selectField, where=None, args=None):
# 获取行数
# 这里的 _num_ 什么意思?别名? 我估计是mysql里面一个记录实时查询结果条数的变量
sql = ['select %s _num_ from `%s`' % (selectField, cls.__table__)]
# pdb.set_trace()
if where:
sql.append('where')
sql.append(where) # 这里不加空格?
rs = yield from select(' '.join(sql), args, 1) # size = 1
if len(rs) == 0: # 结果集为0的情况
return None
return rs[0]['_num_'] # 有结果则rs这个list中第一个词典元素_num_这个key的value值
@classmethod
@asyncio.coroutine
def find_by_key(cls, pk):
# 根据主键查找
# pk是dict对象
rs = yield from select('%s where `%s`=?' % (cls.__select__, cls.__primary_key__), [pk], 1)
if len(rs) == 0:
return None
return cls(**rs[0])
# 这个是实例方法
@asyncio.coroutine
def save(self):
# arg是保存所有Model实例属性和主键的list,使用getValueOrDefault方法的好处是保存默认值
# 将自己的fields保存进去
args = list(map(self.getValueOrDefault, self.__fields__))
args.append(self.getValueOrDefault(self.__primary_key__))
# pdb.set_trace()
rows = yield from execute(self.__insert__, args) # 使用默认插入函数
if rows != 1:
# 插入失败就是rows!=1
logging.warn(
'failed to insert record: affected rows: %s' % rows)
@asyncio.coroutine
def update(self):
# 这里使用getValue说明只能更新那些已经存在的值,因此不能使用getValueOrDefault方法
args = list(map(self.getValue, self.__fields__))
args.append(self.getValue(self.__primary_key__))
# pdb.set_trace()
rows = yield from execute(self.__update__, args) # args是属性的list
if rows != 1:
logging.warn(
'failed to update by primary key: affected rows: %s' % rows)
@asyncio.coroutine
def remove(self):
args = [self.getValue(self.__primary_key__)]
# pdb.set_trace()
rows = yield from execute(self.__delete__, args)
if rows != 1:
logging.warn(
'failed to remove by primary key: affected rows: %s' % rows)
The test file is as follows.
from models import User
import asyncio
import sys
import orm
import pdb
import time
# import pdb
# 测试插入
@asyncio.coroutine
def test_save(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
u = User(name='hi', email='[email protected]',
passwd='hi', image='about:blank')
# pdb.set_trace()
yield from u.save()
# 测试查询
@asyncio.coroutine
def test_findAll(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
# 这里给的关键字参数按照xxx='xxx'的形式给出,会自动分装成dict
rs = yield from User.findAll(email='[email protected]') # rs是一个元素为dict的list
# pdb.set_trace()
for i in range(len(rs)):
print(rs[i])
# 查询条数?
@asyncio.coroutine
def test_findNumber(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
count = yield from User.findNumber('email')
print(count)
# 根据主键查找,这里试ID
@asyncio.coroutine
def test_find_by_key(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
# rs是一个dict
# ID请自己通过数据库查询
rs = yield from User.find_by_key('0014531826762080b29033a78624bc68c867550778f64d6000')
print(rs)
# 根据主键删除
@asyncio.coroutine
def test_remove(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
# 用id初始化一个实例对象
u = User(id='0014531826762080b29033a78624bc68c867550778f64d6000')
yield from u.remove()
# 根据主键更新
@asyncio.coroutine
def test_update(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
# 必须按照列的顺序来初始化:'update `users` set `created_at`=?, `passwd`=?, `image`=?,
# `admin`=?, `name`=?, `email`=? where `id`=?' 注意这里要使用time()方法,否则会直接返回个时间戳对象,而不是float值
u = User(id='00145318300622886f186530ee74afabecedb42f9cd590a000', created_at=time.time(), passwd='test',
image='about:blank', admin=True, name='test', email='[email protected]') # id必须和数据库一直,其他属性可以设置成新的值,属性要全
# pdb.set_trace()
yield from u.update()
loop = asyncio.get_event_loop() # 获取消息循环对象
loop.run_until_complete(test_update(loop)) # 执行协程
loop.close()
Upvotes: 2
Views: 2423
Reputation: 764
I try to put "run_until_complete" method into a coroutine(see the execute_test method at the end of this code), and it seems worked. But I still don't know why.
from models import User
import asyncio
import sys
import orm
import pdb
import time
# import pdb
# 测试插入
@asyncio.coroutine
def test_save(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
u = User(name='hi', email='[email protected]',
passwd='hi', image='about:blank')
# pdb.set_trace()
yield from u.save()
# 测试查询
@asyncio.coroutine
def test_findAll(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
# 这里给的关键字参数按照xxx='xxx'的形式给出,会自动分装成dict
rs = yield from User.findAll(email='[email protected]') # rs是一个元素为dict的list
# pdb.set_trace()
for i in range(len(rs)):
print(rs[i])
# 查询条数?
@asyncio.coroutine
def test_findNumber(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
count = yield from User.findNumber('email')
print(count)
# 根据主键查找,这里试ID
@asyncio.coroutine
def test_find_by_key(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
# rs是一个dict
# ID请自己通过数据库查询
rs = yield from User.find_by_key('0014531826762080b29033a78624bc68c867550778f64d6000')
print(rs)
# 根据主键删除
@asyncio.coroutine
def test_remove(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
# 用id初始化一个实例对象
u = User(id='0014531826762080b29033a78624bc68c867550778f64d6000')
yield from u.remove()
# 根据主键更新
@asyncio.coroutine
def test_update(loop):
yield from orm.create_pool(loop, user='kami', password='kami', db='pure_blog')
# 必须按照列的顺序来初始化:'update `users` set `created_at`=?, `passwd`=?, `image`=?,
# `admin`=?, `name`=?, `email`=? where `id`=?' 注意这里要使用time()方法,否则会直接返回个时间戳对象,而不是float值
u = User(id='00145318300622886f186530ee74afabecedb42f9cd590a000', created_at=time.time(), passwd='test',
image='about:blank', admin=True, name='test', email='[email protected]') # id必须和数据库一直,其他属性可以设置成新的值,属性要全
# pdb.set_trace()
yield from u.update()
@asyncio.coroutine
def execute_test(loop):
yield from loop.run_until_complete(test_update(loop)) # 执行协程
yield from loop.close()
loop = asyncio.get_event_loop() # 获取消息循环对象
execute_test(loop)
Upvotes: -1
Reputation: 101
Before closing event loop, you need to close connection pool, see docs
pool.close()
yield from pool.wait_closed()
in your case:
loop = asyncio.get_event_loop()
loop.run_until_complete(test_update(loop))
__pool.close()
loop.run_until_complete(__pool.wait_closed())
loop.close()
Upvotes: 4