Reputation: 2127
Betfairlightweight API: https://github.com/betcode-org/betfair
To work with this module, it is necessary to pass the APIClient data and login:
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
To speed up the data collection process, I'm use multiprocessing:
from multiprocessing import Pool
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
def main():
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
list_pool = pool.map(data_event, matches_bf.iterrows())
finally:
pool.close()
pool.join()
trading.logout()
def data_event(event_bf):
_, event_bf = event_bf
event_id = event_bf['event_id']
filter_catalog_markets = betfairlightweight.filters.market_filter(
event_ids=[event_id],
market_type_codes = [
'MATCH_ODDS'
]
)
catalog_markets = trading.betting.list_market_catalogue(
filter=filter_catalog_markets,
max_results='100',
sort='FIRST_TO_START',
market_projection=['RUNNER_METADATA']
)
... # some more code
... # some more code
... # some more code
That way 12 logins are made. For accessing an API, this is not the ideal way.
Why 12 logins?
When I activate the code it makes 1 login and when the multiprocessing pool is created, it generates 11 more logins, one for each process. If I put print(trading)
exactly below trading.login()
, one print statement appears in the terminal when the code starts to run, then another 11 happen simultaneously when the pool is created.
So I need to find a way to be able to do this same service using only ONE login.
I tried to throw the login inside main()
and add as an argument to call the function:
from multiprocessing import Pool
from itertools import repeat
def main():
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
list_pool = pool.map(data_event, zip(repeat(trading),matches_bf.iterrows()))
finally:
pool.close()
pool.join()
trading.logout()
def data_event(trading,event_bf):
trading = trading
_, event_bf = event_bf
event_id = event_bf['event_id']
filter_catalog_markets = betfairlightweight.filters.market_filter(
event_ids=[event_id],
market_type_codes = [
'MATCH_ODDS'
]
)
catalog_markets = trading.betting.list_market_catalogue(
filter=filter_catalog_markets,
max_results='100',
sort='FIRST_TO_START',
market_projection=['RUNNER_METADATA']
)
... # some more code
... # some more code
... # some more code
But the error encountered is:
TypeError: cannot pickle 'module' object
I tried to put trading
inside the function data_event
:
from multiprocessing import Pool
def main():
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
list_pool = pool.map(data_event, matches_bf.iterrows())
finally:
pool.close()
pool.join()
trading.logout()
def data_event(event_bf):
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
_, event_bf = event_bf
event_id = event_bf['event_id']
filter_catalog_markets = betfairlightweight.filters.market_filter(
event_ids=[event_id],
market_type_codes = [
'MATCH_ODDS'
]
)
catalog_markets = trading.betting.list_market_catalogue(
filter=filter_catalog_markets,
max_results='100',
sort='FIRST_TO_START',
market_projection=['RUNNER_METADATA']
)
... # some more code
... # some more code
... # some more code
But the error encountered is:
errorCode': 'INVALID_SESSION_INFORMATION'
The reason is logical: multiprocessing did not login.
How should I proceed so that I use only one login and can do everything I need without being forced to work one by one (line by line without multiprocessing takes too long, not feasible)?
Additional info:
Upvotes: 0
Views: 293
Reputation: 43088
Pass session
for betfairlightweight.APIClient
instance to be pickleable.
trading = betfairlightweight.APIClient(
username,
pw,
app_key=app_key,
cert_files=('blablabla.crt','blablabla.key'),
session=requests.Session(), # Add this
)
TypeError: cannot pickle 'module' object
APIClient
(BaseClient
) defaults self.session
to the requests
module.
class APIClient(BaseClient):
...
class BaseClient:
...
def __init__(
self,
username: str,
password: str = None,
app_key: str = None,
certs: str = None,
locale: str = None,
cert_files: Union[Tuple[str], str, None] = None,
lightweight: bool = False,
session: requests.Session = None,
):
...
self.session = session if session else requests
...
Upvotes: 1
Reputation: 8111
Each thread is executing the login
operation, so putting the login logic in a function that is invoked via if __name__ == ...
should fix the issue by shielding it from the threads.
from multiprocessing import Pool
def main():
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
trading.keep_alive()
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
m = multiprocessing.Manager()
queue = m.Queue()
queue.put(trading)
list_pool = pool.starmap(data_event, [(queue, row) for row in matches_bf.iterrows()])
finally:
pool.close()
pool.join()
trading.logout()
def data_event(queue, event_bf):
trading = queue.get()
_, event_bf = event_bf
... # some more code
queue.put(trading)
if __name__ == "main":
main()
The main issue here is, the trading
object (basically the API client returned by the trading library) can not be serialized, because of which multiprocessing
fails to pickle it and send it to processes. As I see it, there's no "direct" solution to your question; however, you can try either of these workarounds:
dill
instead of pickle
.multiprocessing.pool.Pool
, you could use multiprocessing.pool.ThreadPool
. Since it shares memory with the main thread, each sub-process wouldn't need to create a new trading object. This might come at a performance hit in comparison to Pool
though.Upvotes: 1