Trevor Seibert
Trevor Seibert

Reputation: 119

Python: How to implement concurrent futures to a function

I was wonder what would be a good way to implement Concurrent Futures to iterate through a large list of stocks for New Program.

On my previous program, I tried using concurrent futures but when printing the data it was not consistent. For example when running a large list of stocks, it will give different information each time(As you can see for Output 1 and 2 for the previous program). I wanted to provide my previous program to see what I did wrong with implementing concurrent futures.

Thanks!

New Program

tickers = ["A","AA","AAC","AACG","AACIU","AADI","AAIC","AAIN","AAL","AAMC","AAME","AAN","AAOI","AAON","AAP","AAPL"]
def create_df(tickers):
    all_info = []
    for ticker in tickers:
        all_info.append(yf.Ticker(ticker).info)
        
    df = pd.DataFrame.from_records(all_info)
    df = df[['symbol','ebitda', 'enterpriseValue', 'trailingPE', 'sector']]
    df.dropna(inplace=True)
    # This is where you can add calculations and other columns not in Yfinance Library
    df['EV/Ratio'] = df['enterpriseValue'] / df['ebitda']
    return df

df = create_df(tickers)
print(df)
print('It took', time.time()-start, 'seconds.')

Output

   symbol        ebitda  enterpriseValue  trailingPE              sector   EV/Ratio
0       A  1.762000e+09     5.311271e+10   60.754720          Healthcare  30.143422
9    AAMC -2.015600e+07     1.971329e+08    1.013164  Financial Services  -9.780359
10   AAME  2.305600e+07     1.175756e+08    7.652329  Financial Services   5.099566
11    AAN  8.132960e+08     1.228469e+09    9.329710   Consumer Cyclical   1.510483
13   AAON  1.178790e+08     3.501286e+09   55.615944         Industrials  29.702376
14    AAP  1.239876e+09     1.609877e+10   25.986680   Consumer Cyclical  12.984181
15   AAPL  1.109350e+11     2.489513e+12   33.715443          Technology  22.441190
It took 101.81006002426147 seconds.

Previous Program For Reference

tickers = ["A","AA","AAC","AACG","AACIU","AADI","AAIC","AAIN","AAL","AAMC","AAME","AAN","AAOI","AAON","AAP","AAPL"]
start = time.time()

col_a = []  
col_b = []  
col_c = []  
col_d = []  

print('Lodaing Data... Please wait for results')


def do_something(tickers):
    print('---', tickers, '---')
    all_info = yf.Ticker(tickers).info
    try:
        a = all_info.get('ebitda')
        b = all_info.get('enterpriseValue')
        c = all_info.get('trailingPE')
        d = all_info.get('sector')
    except:
        None
    col_a.append(a)  
    col_b.append(b)  
    col_c.append(c)  
    col_d.append(d)     
    return
with concurrent.futures.ThreadPoolExecutor() as executer:
    executer.map(do_something, tickers)
        

# Dataframe Set Up
pd.set_option("display.max_rows", None)
   
df = pd.DataFrame({
    'Ticker': tickers,
    'Ebitda': col_a,  
    'EnterpriseValue' :col_b,  
    'PE Ratio': col_c,  
    'Sector': col_d,
})
print(df.dropna())
print(len('Total Companies with Information'))
print('It took', time.time()-start, 'seconds.')

Output 1 for Previous Program

   Ticker        Ebitda  EnterpriseValue   PE Ratio              Sector
1      AA  1.651000e+09     5.031802e+10  49.183292          Healthcare
3    AACG  2.216000e+09     1.168140e+10  11.711775     Basic Materials
5    AADI  1.928800e+07     1.108360e+08   6.954397  Financial Services
7    AAIN  1.128370e+08     3.960835e+09  57.706764         Industrials
8     AAL  8.303301e+08     1.103969e+09   9.111819   Consumer Cyclical
10   AAME  1.202330e+11     2.534678e+12  26.737967          Technology
12   AAOI -1.848400e+07     1.277540e+08   0.355233  Financial Services
14    AAP  1.224954e+09     1.770882e+10  26.059464   Consumer Cyclical
32
It took 4.2548089027404785 seconds.

Output 2 for Previous Program

   Ticker        Ebitda  EnterpriseValue   PE Ratio              Sector
0       A -1.848400e+07     1.277540e+08   0.355233  Financial Services
4   AACIU  1.202330e+11     2.534678e+12  26.737967          Technology
5    AADI  1.651000e+09     5.031802e+10  49.183292          Healthcare
7    AAIN  1.128370e+08     3.960835e+09  57.706764         Industrials
9    AAMC  8.303301e+08     1.103969e+09   9.111819   Consumer Cyclical
10   AAME  2.216000e+09     1.168140e+10  11.711775     Basic Materials
13   AAON  1.224954e+09     1.770882e+10  26.059464   Consumer Cyclical
14    AAP  1.928800e+07     1.108360e+08   6.954397  Financial Services
32
It took 4.003742933273315 seconds.

Upvotes: 0

Views: 1454

Answers (2)

Paul Cornelius
Paul Cornelius

Reputation: 10916

You have a multithreaded program. The function ThreadPoolExecutor.map launches a number of threads that will run concurrently. Each thread consists of one call to do_something(), but you do not have any control over the order in which these threads execute or finish. The problem occurs because you append the results (a, b, c, d) to the individual lists col_a, col_b etc. inside do_something. Those lists are global, so the data gets appended to them in more-or-less random order. It is even possible that a thread switch occurs right in the middle of the four calls to append(). So the order of the data will be random, and the individual rows might be messed up.

The list of ticker symbols is added to the dataframe in the main thread. So the list of symbols and the data itself are not synchronized. That's exactly what you observe.

The easiest solution is to set up all your data structures in the main thread. This is easy to do because the function map() returns an iterator, and the order of iteration is guaranteed to be preserved. The iterator steps over the values returned by do_something(). So instead of trying to update the lists col_a, col_b, etc. in that function, just return the values a, b, c, d as a tuple. Back in your main thread, you take these values and append them to the columns.

The order of execution of the different threads is not controlled, but map() sorts it out for you; it collects all the results first, and then steps through them in order.

Change this part of your program - everything else can stay the same.

def do_something(tickers):
    print('---', tickers, '---')
    all_info = yf.Ticker(tickers).info
    try:
        a = all_info.get('ebitda')
        b = all_info.get('enterpriseValue')
        c = all_info.get('trailingPE')
        d = all_info.get('sector')
    except:
        return None, None, None, None  # must return a 4-tuple
    return a, b, c, d

with concurrent.futures.ThreadPoolExecutor() as executer:
    for a, b, c, d in executer.map(do_something, tickers):
        col_a.append(a)  
        col_b.append(b)  
        col_c.append(c)  
        col_d.append(d)     

Upvotes: 1

Trevor Seibert
Trevor Seibert

Reputation: 119

Here is the answer on how to implement multithreading to New Function provided by @iudeen

import pandas as pd
import yfinance as yf
from concurrent.futures import ThreadPoolExecutor
import time
from stocks import tickers
start = time.time()



print('Lodaing Data... Please wait for results')
all_info = []
def create_df(ticker):
    all_info.append(yf.Ticker(ticker).info)
    
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(create_df, x) for x in tickers]

df = pd.DataFrame.from_records(all_info)
df = df[['symbol','ebitda', 'enterpriseValue', 'trailingPE', 'sector']]
df.dropna(inplace=True)
df['EV/Ratio'] = df['enterpriseValue'] / df['ebitda']
print(df)
print('It took', time.time()-start, 'seconds.')

Upvotes: 0

Related Questions