Reputation: 150
I am using peewee and a mariadb database in a script, the script looks at a db table and is submitting data from that query into ThreadPoolExecutor. The worker itself needs to query the database aswell. After all futures are finished, the script start at the beginning again.
When I am using a single connection everything works, but as my worker job is mostly network IO, I feel that a single connection for all worker thread will become the bottleneck.
If I change to database pooling, I can monitor that connection are increasing until I get an error "too many open connections" from peewee. Connections are never closed themselves. This fits to peewee's documentation
However I have no idea how manually open and close the db-connection from inside my worker function.
I tried making the database variable in models.py global then I could access that object in my worker, but observing all open connection on my db made me realise .close()/.open() had no effect in this case.
I also pasted everything into a single file, I was still not able to open/close connection manually.
The documentation is giving only examples on how to use a pool with different webframeworks.
My app simplified
#app.py
from models.models import MyTableA, MyTableB
def do_work(data):
MyTableB.create(name="foo")
def main()
logger = logging.getLogger()
data = MyTableA.select().dicts()
with ThreadPoolExecutor(8) as executor:
future_to_system = {executor.submit(do_work, d): d.id for d in data}
for future in as_completed(future_to_system):
system = future_to_system[future]
try:
future.result()
except Exception as exc:
logger.info('%r generated an exception: %s' % (system, exc))
else:
logger.debug('%r result is %s' % (system, "ok"))
if __name__ == '__main__':
main()
models.py
from peewee import *
from playhouse.pool import PooledMySQLDatabase
#uncomment this line to use pool
#database = PooledMySQLDatabase('db_name', **{'charset': 'utf8', 'sql_mode': 'PIPES_AS_CONCAT', 'use_unicode': True, 'user': 'user', 'passwd': 'pass', 'max_connections': 32, 'stale_timeout': 300})
#comment that line to use pool
database = PooledMySQLDatabase('db_name', **{'charset': 'utf8', 'sql_mode': 'PIPES_AS_CONCAT', 'use_unicode': True, 'user': 'user', 'passwd': 'pass'})
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = database
class MyTableA(BaseModel):
name = CharField()
class Meta:
table_name = 'my_table'
class MyTableB(BaseModel):
name = CharField()
class Meta:
table_name = 'my_table'
If anyone has an idea on how to use peewee's connectionpool together with Threadpoolexecutor, I would be very thankful.
Upvotes: 1
Views: 1915
Reputation: 150
I have found a solution here
models.py
from peewee import *
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = None
class MyTableA(BaseModel):
name = CharField()
class Meta:
table_name = 'my_table'
class MyTableB(BaseModel):
name = CharField()
class Meta:
table_name = 'my_table'
app.py
from playhouse.pool import PooledMySQLDatabase
from models.models import MyTableA, MyTableB
def do_work(data, db):
with db:
MyTableB.create(name="foo")
def main()
logger = logging.getLogger()
database = PooledMySQLDatabase('db_name', **{'charset': 'utf8', 'sql_mode': 'PIPES_AS_CONCAT', 'use_unicode': True, 'user': 'user', 'passwd': 'pass', 'max_connections': 32, 'stale_timeout': 300})
database.bind(
[
MyTableA, MyTableB
]
)
with database:
data = MyTableA.select().dicts()
with ThreadPoolExecutor(8) as executor:
future_to_system = {executor.submit(do_work, d): d.id for d in data}
for future in as_completed(future_to_system):
system = future_to_system[future]
try:
future.result()
except Exception as exc:
logger.info('%r generated an exception: %s' % (system, exc))
else:
logger.debug('%r result is %s' % (system, "ok"))
if __name__ == '__main__':
main()
Upvotes: 1