smackenzie
smackenzie

Reputation: 3022

Python multiprocessing slower processing a list, even when using shared memory

How can a non parallelised version of this code run much faster than a parallel version, when I am using a Manager object to share a list across processes. I did this to avoid any serialisation, and I don't need to edit the list.

I return an 800,000 row data set from Oracle, convert it to a list and store it in shared memory using Manager.list().

I am iterating over each column in the query results, in parallel, to obtain some statistics (I know I could do it in SQL).

Main code:

import cx_Oracle
import csv
import os
import glob
import datetime
import multiprocessing as mp
import get_column_stats as gs;
import pandas as pd
import pandas.io.sql as psql


def get_data():
    print("Starting Job: " + str(datetime.datetime.now()));
    manager = mp.Manager()

    # Step 1: Init multiprocessing.Pool()   
    pool = mp.Pool(mp.cpu_count())
    print("CPU Count: " + str(mp.cpu_count()))

    dsn_tns = cx_Oracle.makedsn('myserver.net', '1521', service_name='PARIELGX');
    con = cx_Oracle.connect(user='fred', password='password123', dsn=dsn_tns);


    stats_results = [["OWNER","TABLE","COLUMN_NAME","RECORD_COUNT","DISTINCT_VALUES","MIN_LENGTH","MAX_LENGTH","MIN_VAL","MAX_VAL"]];

    sql = "SELECT * FROM ARIEL.DIM_REGISTRATION_SET"

    cur = con.cursor();

    print("Start Executing SQL: " + str(datetime.datetime.now()));
    cur.execute(sql);
    print("End SQL Execution: " + str(datetime.datetime.now()));

    print("Start SQL Fetch: " + str(datetime.datetime.now()));
    rs = cur.fetchall();
    print("End SQL Fetch: " + str(datetime.datetime.now()));

    print("Start Creation of Shared Memory List: " + str(datetime.datetime.now()));    
    lrs = manager.list(list(rs)) # shared memory list
    print("End Creation of Shared Memory List: " + str(datetime.datetime.now()));

    col_names = [];
    for field in cur.description:   
        col_names.append(field[0]); 

    #print(col_names)
    #print('-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-')
    #print(rs)
    #print('-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-')
    #print(lrs)




    col_index = 0;


    print("Start In-Memory Iteration of Dataset: " + str(datetime.datetime.now()));
      # we go through every field
    for field in cur.description:   
        col_names.append(field[0]);       

    # start at column 0
    col_index = 0;

    # iterate through each column, to gather stats from each column using parallelisation
    pool_results = pool.map_async(gs.get_column_stats_rs, [(lrs, col_name, col_names) for col_name in col_names]).get()

    for i in pool_results:
        stats_results.append(i)

    # Step 3: Don't forget to close
    pool.close() 


    print("End In-Memory Iteration of Dataset: " + str(datetime.datetime.now()));
    # end filename for
    cur.close();           

    outfile = open('C:\jupyter\Experiment\stats_dim_registration_set.csv','w');
    writer=csv.writer(outfile,quoting=csv.QUOTE_ALL, lineterminator='\n');
    writer.writerows(stats_results);
    outfile.close()
    print("Ending Job: " + str(datetime.datetime.now()));





get_data();

Code being called in parallel:

def get_column_stats_rs(args):
    # rs is a list recordset of the results
    rs, col_name, col_names = args

    col_index = col_names.index(col_name)

    sys.stdout = open("col_" + col_name + ".out", "a")

    print("Starting Iteration of Column: " + col_name)
    max_length = 0
    min_length = 100000  # abitrarily large number!!

    max_value = ""
    min_value = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"  # abitrarily large number!!

    distinct_value_count = 0

    has_values = False  # does the column have any non-null values
    has_null_values = False

    row_count = 0

    # create a dictionary into which we can add the individual items present in each row of data
    # a dictionary will not let us add the same value more than once, so we can simply count the
    # dictionary values at the end
    distinct_values = {}

    row_index = 0

    # go through every row, for the current column being processed to gather the stats
    for val in rs:
        row_value = val[col_index]

        row_count += 1

        if row_value is None:
            value_length = 0
        else:
            value_length = len(str(row_value))

        if value_length > max_length:
            max_length = value_length

        if value_length < min_length:
            if value_length > 0:
                min_length = value_length

        if row_value is not None:
            if str(row_value) > max_value:
                max_value = str(row_value)
            if str(row_value) < min_value:
                min_value = str(row_value)

        # capture distinct values
        if row_value is None:
            row_value = "Null"
            has_null_values = True
        else:
            has_values = True
            distinct_values[row_value] = 1

        row_index += 1
        # end row for

    distinct_value_count = len(distinct_values)

    if has_values == False:
        distinct_value_count = None
        min_length = None
        max_length = None
        min_value = None
        max_value = None
    elif has_null_values == True and distinct_value_count > 0:
        distinct_value_count -= 1

    if min_length == 0 and max_length > 0 and has_values == True:
        min_length = max_length

    print("Ending Iteration of Column: " + col_name)

    return ["ARIEL", "DIM_REGISTRATION_SET", col_name, row_count, distinct_value_count, min_length, max_length,
        strip_crlf(str(min_value)), strip_crlf(str(max_value))]

Helper function:

def strip_crlf(value):
    return value.replace('\n', ' ').replace('\r', '')

I am using a Manager.list() object to share state across the processes:

lrs = manager.list(list(rs)) # shared memory list

And am passing the list in the map_async() method:

pool_results = pool.map_async(gs.get_column_stats_rs, [(lrs, col_name, col_names) for col_name in col_names]).get()

Upvotes: 3

Views: 499

Answers (1)

altunyurt
altunyurt

Reputation: 2946

Manager overhead is adding up to your runtime. Also you're not directly using shared memory here. You're only using multiprocessing manager, which is known to be slower than shared memory or single thread implementations. If you don't need synchronisation in your code, meaning that you're not modifying the shared data, just skip manager and use shared memory objects directly.

https://docs.python.org/3.7/library/multiprocessing.html

Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.

Upvotes: 1

Related Questions