Reputation: 3184
I'm writing an asyncio script to retrieve stock bars data from Interactive Brokers via the ib_insync library.
While I have the script working, the performance is similar to a serial script. I was hoping to see a drastic improvement in speed. This code will be used in production.
I am new to asyncio and feel like I'm missing an important element. Below is the full script. Would very much appriciate assistance in speeding this up. Thanks.
import asyncio
import ib_insync as ibi
import nest_asyncio
import pandas as pd
nest_asyncio.apply()
class App:
async def run(self, symbols):
print(f"1 start run: {symbols}")
self.ib = ibi.IB()
with await self.ib.connectAsync("127.0.0.1", "****", clientId="****"):
contracts = [ibi.Stock(symbol, "SMART", "USD") for symbol in symbols]
bars_dict = dict()
print(f"2 start loop: {symbols}")
for contract in contracts:
bars = await self.ib.reqHistoricalDataAsync(
contract,
endDateTime="",
durationStr="1 M",
barSizeSetting="1 day",
whatToShow="ADJUSTED_LAST",
useRTH=True,
)
# Convert to dataframes.
bars_dict[contract.symbol] = ibi.util.df(bars)
print(f"3 End bars: {symbols}")
return bars_dict
async def main(self):
res = await asyncio.gather(self.run(self.sp500(0, 100)))
return res
def stop(self):
self.ib.disconnect()
def sp500(self, start=None, end=10):
payload = pd.read_html(
"https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
)
first_table = payload[0]
sp500 = first_table["Symbol"].sort_values().to_list()
return sp500[start:end]
if __name__ == "__main__":
import time
start = time.time()
app = App()
try:
print(f"START CALL")
res = asyncio.run(app.main())
print(f"END CALL")
except (KeyboardInterrupt, SystemExit):
app.stop()
for ticker, bars in res[0].items():
print(f"{ticker}\n{bars}")
print(f"Total time: {(time.time() - start)}")
Upvotes: 2
Views: 249
Reputation: 154876
Your script is running in sequence. The call to asyncio.gather()
in main
is useless because it is invoked with just one coroutine. You're supposed to call it with multiple coroutines to have them run in parallel.
For example, you could remove the asyncio.gather()
from main
(just await self.run(self.sp500(0, 100)
there) and instead use it to parallelize calls to reqHistoricalDataAsync
:
class App:
async def run(self, symbols):
print(f"1 start run: {symbols}")
self.ib = ibi.IB()
with await self.ib.connectAsync("127.0.0.1", "****", clientId="****"):
contracts = [ibi.Stock(symbol, "SMART", "USD") for symbol in symbols]
print(f"2 start loop: {symbols}")
all_bars = await asyncio.gather(*[
self.ib.reqHistoricalDataAsync(
contract,
endDateTime="",
durationStr="1 M",
barSizeSetting="1 day",
whatToShow="ADJUSTED_LAST",
useRTH=True,
)
for contract in contracts
])
bars_dict = {}
for contract, bars in zip(contracts, all_bars):
# Convert to dataframes.
bars_dict[contract.symbol] = ibi.util.df(bars)
print(f"3 End bars: {symbols}")
return bars_dict
Upvotes: 1