Reputation: 295
My general idea is, that I have a or more test functions which can call another function. So:
Of course, that does not work with a literal return, so my question is, if there is a possibility I do not see.
A practical example:
Test function in pytest - test_config is a fixture:
def test_load_into_database(test_config):
logger.info('Testing load_into_database')
data = {
'filename': 'file_0'
}
accessor = list(database_accessor(test_config))[0]
access_data = accessor['session'].query(accessor['table']).all()
assert access_data[0].filename != data['filename']
accessor['session'].add(accessor['table'](
filename='file_0'
))
accessor['session'].commit()
access_data = accessor['session'].query(accessor['table']).all()
assert access_data[0].filename == data['filename']
This test function calls another one:
def database_accessor(cfg):
setup = 'mysql+pymysql://{}:{}@{}/{}'.format(
cfg['database_user'],
cfg['database_passwd'],
cfg['database_host'],
cfg['database_db']
)
Base = automap_base()
engine = create_engine(setup, echo=False)
Base.prepare(engine, reflect=True)
table = Base.classes.get(cfg['database_table'])
session = Session(engine)
->return and wait<- {
'session': session,
'table': table
}
session.close()
with engine.connect() as con:
con.execution_options(autocommit=True).execute("TRUNCATE TABLE {}".format(cfg['database_table']))
What I want is, that the database_accessor returns the dictionary, then waits for the underlying function (in this case the test_function) to finish, then resumes.
That way, it is possible to use a variable number of test functions with the same database_accessor without executing all the different test functions in the database_accessor.
Callbacks
I know that there is a way, with a callback function, but that doubles my functions, which I do not want. E.g.
def test_load_into_database():
database_accessor(load_into_database_1, var1, var2)
def database_accessor(function, args*):
# do stuff
function(args, stuff)
# do other stuff
def load_into_database_1(args, stuff):
# do something
Upvotes: 0
Views: 297
Reputation: 10452
I think you'd want to make database_accessor
a context manager:
with database_accessor(test_config) as accessor:
access_data = accessor['session'].query(accessor['table']).all()
assert access_data[0].filename != data['filename']
accessor['session'].add(accessor['table'](
filename='file_0'
))
accessor['session'].commit()
access_data = accessor['session'].query(accessor['table']).all()
assert access_data[0].filename == data['filename']
The implementation would looks something like this:
from contextlib import contextmanager
@contextmanager
def database_accessor(cfg):
setup = 'mysql+pymysql://{}:{}@{}/{}'.format(
cfg['database_user'],
cfg['database_passwd'],
cfg['database_host'],
cfg['database_db']
)
Base = automap_base()
engine = create_engine(setup, echo=False)
Base.prepare(engine, reflect=True)
table = Base.classes.get(cfg['database_table'])
session = Session(engine)
try:
yield {
'session': session,
'table': table
}
finally:
session.close()
with engine.connect() as con:
con.execution_options(autocommit=True).execute("TRUNCATE TABLE {}".format(cfg['database_table']))
In this case, I would modify it slightly to yield a tuple instead of a dict:
yield session, table
and then, when using it you could do:
with database_accessor(test_config) as (session, table):
Upvotes: 1