cmccall95
cmccall95

Reputation: 149

Correctly use multiprocessing

I am attempting to use multiprocessing for the first time and have to extract ~500,000 records(Right now I have variables set for 500). The original loop would have taken a very long time so I am trying multiprocessing instead. Right now I have 10 processes running, and it works, but it is still going to take around 4 hours to complete. I would like to run 20 or so processes, but I am afraid there may be a performance issue with my computer, and I would hate to wake up in the morning to see that the program crashed. Am I using this correctly or is there a better way?

Full Code:

from pyETT import ett_parser
import pandas as pd
import time
from datetime import datetime
from multiprocessing import Process
import sys

c = 10
x1,y1 = 1,50
x2,y2 = 51,100
x3,y3 = 101,150
x4,y4 = 151,200
x5,y5 = 201,250
x6,y6 = 251,300
x7,y7 = 301,350
x8,y8 = 351,400
x9,y9 = 401,450
x10,y10 = 451,500
m_cols = ('user-name','elo','rank','wins','losses','last-online')

def run1():
    print('Running query 1...')
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x1, y1):
        try:
            if int(i) % int(c) == 0:
                print('Loop1 is at:', i)
            
            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_1:",i )

    #Export to excel
    file_name = 'export_file1_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run2():
    print('Running query2...')
    m_cols = ('user-name','elo','rank','wins','losses','last-online')
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x2, y2):
        try:
            if int(i) % int(c) == 0:
                print('Loop2 is at:', i)

            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_2:",i )

    #Export to excel
    file_name = 'export_file2_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')


def run3():
    print('Running query3...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x3, y3):
        try:
            if int(i) % int(c) == 0:
                print('Loop3 is at:', i)

            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_3:",i )

    #Export to excel
    file_name = 'export_file3_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run4():
    print('Running query4...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x4, y4):
        try:
            if int(i) % int(c) == 0:
                print('Loop4 is at:', i)

            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_4:",i )

    #Export to excel
    file_name = 'export_file4_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run5():
    print('Running query5...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x5, y5):
        try:
            if int(i) % int(c) == 0:
                print('Loop5 is at:', i)
            
            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_5:",i )

    #Export to excel
    file_name = 'export_file5_' + datetime.now().strftime("%H_%M_%S") +  '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run6():
    print('Running query6...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x6, y6):
        try:
            if int(i) % int(c) == 0:
                print('Loop6 is at:', i)
            
            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_6:",i )

    #Export to excel
    file_name = 'export_file6_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run7():
    print('Running query7...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x7, y7):
        try:
            if int(i) % int(c) == 0:
                print('Loop7 is at:', i)

            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_7:",i )

    #Export to excel
    file_name = 'export_file7_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run8():
    print('Running query8...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x8, y8):
        try:
            if int(i) % int(c) == 0:
                print('Loop8 is at:', i)
            
            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_8:",i )

    #Export to excel
    file_name = 'export_file8_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run9():
    print('Running query9...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x9, y9):
        try:
            if int(i) % int(c) == 0:
                print('Loop9 is at:', i)
            
            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_9:",i )

    #Export to excel
    file_name = 'export_file9_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')


def run10():
    print('Running query10...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x10, y10):
        try:
            if int(i) % int(c) == 0:
                print('Loop10 is at:', i)
            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_10:",i )

    #Export to excel
    file_name = 'export_file10_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')
    
def main():

    
    p = Process(target=run1)
    p.start()
    #p.join()

    p2 = Process(target=run2)
    p2.start()

    p3 = Process(target=run3)
    p3.start()
    
    p4 = Process(target=run4)
    p4.start()

    p5 = Process(target=run5)
    p5.start()
    
    p6 = Process(target=run6)
    p6.start()

    p7 = Process(target=run7)
    p7.start()

    p8 = Process(target=run8)
    p8.start()

    p9 = Process(target=run9)
    p9.start()

    p10 = Process(target=run10)
    p10.start()
    p10.join()
    
if __name__ == '__main__':
    start = time.time()
    print('starting main')
    main()
    print('finishing main',time.time()-start)

Updated Code

Using swaggg's answer, this code does what I want and is much shorter.

from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
from pyETT import ett_parser
import pandas as pd
import time

def main():
    USER_ID_COUNT = 50
    MAX_WORKERS = 2 * cpu_count() + 1
    dataframe_list = []

    #user_array = [] 
    user_ids = list(range(1, USER_ID_COUNT))
 
    def obtain_user_record(user_id):
        return ett_parser.get_user(user_id)

    with ThreadPoolExecutor(MAX_WORKERS) as executor:
       for user_id, user_record in zip(user_ids, executor.map(obtain_user_record, user_ids)):
          if user_record:
             dataframe_list.append(user_record)

    df_master = pd.DataFrame.from_dict(dataframe_list,orient='columns')
    print(df_master)
    
if __name__ == '__main__':
    start = time.time()
    print('starting main')
    main()
    print('finishing main', time.time() - start)

Upvotes: 1

Views: 1243

Answers (2)

swaggg
swaggg

Reputation: 480

Simply use the multiprocessing.Pool, which by default uses all of your cores and doesn't need any complicated management to make it operate at 100% efficiency. Spawning too many processes is not the way to go.

It will also make your code much neater as you only need only need to provide it with one function and a map of the data.

Also:

You can use time.strftime("%H_%M_%S") instead of datetime.now().strftime("%H_%M_%S"). The datetime module is really slow.

If you use primitives instead of pandas, especially just the basic Python open combined with string.split or less optimally the Python xls.reader/writer, you will see a huge performance increase. Pandas is really slow. There are certainly many cases where its useful features make up for its slow performance, but if the only thing you need to do is extract some data from a csv/etc file(s) and you're working with a huge dataset, you're better off not using it.

EDIT: I've had a look at the actual module you use, and it involves sending requests, while I thought you were reading the data from the disk.

Btw, correct me if I'm wrong but I think the ett. in ett.ett_parser.get_user(user_id) is wrong, the actual call should be just to ett_parser.

Because your code involves sending multiple requests over the net and then writing the data to disk, you should probably use multithreading instead of multiprocessing, or you can still use multiprocessing, but experimenting with a different amount of processes, because in this case the task is I/O bound and you should be trying to send more requests than there are logical cores in your cpu. At the same time, Pandas is slow like I said and your other option is to use the xlsxwriter module, which is guaranteed to be faster (since you're really only using pandas to write the strings obtained from your json requests to an .xls file.) You can also using try the .csv writer module, if you're fine with the csv format. But above all, the main bottleneck are going to be the requests, not the processing power it takes to write the data to disk.

Secondly, Booboo's solution adapts your code to be used with the pool, which is great. But it may be more convenient not to split the data into chunks and then divide these chunks between processes. Instead, because the requests are the bottleneck, you can use multithreading and append the results to a global array. If you just use a multithreading map, you only need to pass it the user ids and the function. Therefore, you could do the following:

    from concurrent.futures import ThreadPoolExecutor
    from multiprocessing import cpu_count
    from pyETT import ett_parser

    USER_ID_COUNT = 50000
    MAX_WORKERS = 2 * cpu_count() + 1

    user_array = [] #Can be adapted to Pandas or the xlwriter module        
    user_ids = list(range(1, USER_ID_COUNT))

    def obtain_user_record(user_id):
        return ett_parser.get_user(user_id)

    with ThreadPoolExecutor(MAX_WORKERS) as executor:
       for user_id, user_record in zip(user_ids, executor.map(obtain_user_record, user_ids)):
          user_array.append(user_record)

Then, you would write the array to disk, splitting it as you like, and you could certainly use multiprocessing for that if you felt it was worth it.

EDIT2:

Okay, in that case use Pandas. But, note the documentation page for pandas.DataFrame.append states:

Iteratively appending rows to a DataFrame can be more computationally intensive than a single concatenate. A better solution is to append those rows to a list and then concatenate the list with the original DataFrame all at once.

Oh, and by the way, I think the pyETT function returns a dataframe and not a string, is that right? I'm not quite sure as to the indexing of the return values, but you can figure that out. Do this:

dataframe_list = []
with ThreadPoolExecutor(MAX_WORKERS) as executor:
       for user_id, user_record in zip(user_ids, executor.map(obtain_user_record, user_ids)):
          if user_record:
             dataframe_list.append(user_record)
          #If the dataframes aren't indexed, you could create a pre-filled
          #data_frame_list and overwrite specific indices instead of appending, 
          #then pass ignore_index=True to pd.concat

 df_master = pd.concat(dataframe_list) #We're done, time to merge our dframes
                                       #into one big dframe.

EDIT3: Okay, then we need to convert the data to a pandas dataframe.

user_df = pd.DataFrame.from_dict({user_id:user_record}, orient='index', columns=m_cols)
dataframe_list.append(user_df)

This way, the frames will be indexed by the user ids we pass to the function earlier. And of course, you could have multiple lists and merge them into multiple dataframes, then just write the dataframes to separate .xls files or you could go for some other approach.

EDIT4: Interesting it will let you use pd.DataFrame.from_dict on a list of dicts. The correct function to call would be pd.DataFrame.from_records instead. You don't really need to pass orient='index' to it. What's the advantage of converting the dicts to dataframes, then merging them? Well, you can control the indexing like I showed in Edit3. You could accomplish the same with

pd.DataFrame.from_records(dataframe_list,index=index_list)
df_master.sort_index()

Index_list being a list of indices (that is user_ids), in the same order as dataframe_list.

You can also make a dict instead of a list:

dataframe_dict = {}
with ThreadPoolExecutor(MAX_WORKERS) as executor:
       for user_id, user_record in zip(user_ids, executor.map(obtain_user_record, user_ids)):
          if user_record:
             dataframe_dict[user_id] = user_record

Then the correct function to call is from_dict and you can once again index the records like so:

df_master = pd.DataFrame.from_dict(dataframe_dict, orient='index')   
df_master.sort_index()

Not having to convert the dicts to dataframes each time is better performance-wise, so good call.

Upvotes: 3

Booboo
Booboo

Reputation: 44023

Update

First of all, when you process ranges range(1, 50), range(51, 100), range(101, 150), etc. the actual values that user_id will take on are:

1, 2, ... 49, 51, 52, ... 99, 101, 102, ... 149

That is, you will not be processing values 50, 100, 150, etc. because given range(start, stop, step) then for a positive step, the contents of a range r are determined by the formula r[i] = start + step*i where i >= 0 and r[i] < stop.

I question whether it was your intention to omit those values. I will assume that it was not your intention.

It appears that processing can be divided into an I/O-bound portion (ett.ett_parser.get_user(user_id), which is retrieving a URL) and a more CPU-intensive portion (creation of the pandas dataframe and exporting to Excel). For the former a large multithreading pool should be used and for the later a multiprocessing pool should be used.

The following code creates both pools and passes the multiprocessing pool to the worker function run, which retrieves all the URLs specified in its passed range and then passes the list of lines thus retrieved to the multiprocessing pool for assembly and conversion to Excel.

from pyETT import ett_parser
import pandas as pd
import time
from datetime import datetime
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
from functools import partial
import sys

def run(multiprocessing_pool, run_number, the_range):
    c = 10

    print(f'Running query {run_number}...', flush=True)
    lines = []
    for user_id in the_range:
        try:
            if user_id % c == 0:
                print('Loop is at:', user_id, flush=True)
            line = ett.ett_parser.get_user(user_id)
            lines.append((user_id, line))
        except Exception:
            print("Error:", user_id, flush=True)
    if lines:
        multiprocessing_pool.apply(process_lines, args=(run_number, lines))

def process_lines(run_number, lines):
    m_cols = ('user-name','elo','rank','wins','losses','last-online')

    df_master = pd.DataFrame(columns = m_cols)
    for user_id, line in lines:
        temp_df = pd.DataFrame(line, index=[user_id])
        df_master = df_master.append(temp_df, ignore_index=True)
    #Export to excel
    file_name = f'export_file{run_number}_{datetime.now().strftime("%H_%M_%S")}.xlsx'
    df_master.to_excel(file_name, index=False)
    print(f'DataFrame({file_name}) is written to Excel File successfully.', flush=True)

def split(lst, n):  # function to split list in n even parts
    k, m = divmod(len(lst), n)
    return list(lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

def main():
    MIN_USER_ID = 1
    MAX_USER_ID = 500
    NUM_SPLITS = 10 # 50 user ids per split
    THREAD_POOL_SIZE = min(NUM_SPLITS, 100) # Or maybe even 500!
    # split this range into thread_pool_size smaller ranges:
    # Next line will process 500 user ids: 1, 2, ... 500
    ranges = split(range(MIN_USER_ID, MAX_USER_ID+1), NUM_SPLITS)
    multiprocessing_pool = Pool()
    multithreading_pool = ThreadPool(THREAD_POOL_SIZE)
    multithreading_pool.starmap(partial(run, multiprocessing_pool) , enumerate(ranges, start=1))

if __name__ == '__main__':
    start = time.time()
    print('starting main')
    main()
    print('finishing main', time.time() - start)

Upvotes: 1

Related Questions