GoingMyWay
GoingMyWay

Reputation: 17478

Python, how to implement parallel process

In Python, if the database is very large, a simple select query will take a lot of time. I have a table with 4,700,000 records, and if I use SELECT * FROM MY_TABLE to get all the data in the table, it will take 18 minutes. By setting chunk_size and implement parallel query, it will save time.

So, my code is:

import os
import time
import multiprocessing
import pandas as pd
import MySQLdb as mysql

if __name__ == '__main__':
    conn = mysql.connect(host='192.168.0.114',
                         user='root',
                         passwd='fit123456',
                         db='A_stock_day',
                         charset='utf8'
                        )
    limit = 100000
    offset = 0
    dfs = []
    print 'start.....'
    _s = time.time()
    while True:
        _query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\
                (limit, offset)
        dfs.append(pd.read_sql(_query, conn))
        offset += limit
        if len(dfs[-1]) < limit:
            break
    _e = time.time()
    print 'Time: ', _e - _s
    full_df = pd.concat(dfs)

But, it still takes about 10 minutes. How to parallelize it, let many threads to run at the same time and make execution time down to the execution time of one thread? I have the multiprocessing code here:

def select(info):
    """"""
    limit, offset, conn = info[0], info[1], info[2]
    _query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\
            (limit, offset)
    s = time.time()
    info[3].append(pd.read_sql(_query, conn))
    e = time.time()
    print 'time: ', e - s, ' pid: ', os.getpid()

if __name__ == '__main__':
    conn = mysql.connect(host='192.168.0.114',
                         user='root',
                         passwd='fit123456',
                         db='A_stock_day',
                         charset='utf8'
                        )
    dfs, p, pool= [], [], multiprocessing.Pool(7)
    info = [(1000000, 0, conn, dfs),
            (1000000, 1000000, conn, dfs),
            (1000000, 2000000, conn, dfs),
            (1000000, 3000000, conn, dfs),
            (1000000, 4000000, conn, dfs),
            (1000000, 5000000, conn, dfs),
            (1000000, 6000000, conn, dfs),
           ]
    for _i, _v in enumerate(info):
        print 'start....', _i
        _p = multiprocessing.Process(target=select, args=(_v, ))
        _p.start()
        _p.join()
    print 'The End'

As you can see, although it launched multiprocessing, only one process reads the database at one time. So, that is just multiprocessing, not parallel process.

How to implement parallel multiprocessing to save time? Thanks.

Upvotes: 2

Views: 2957

Answers (1)

sasmith
sasmith

Reputation: 445

In your loop

for _i, _v in enumerate(info):
    print 'start....', _i
    _p = multiprocessing.Process(target=select, args=(_v, ))
    _p.start()
    _p.join()

you're starting processes and then joining on them immediately. This means that your main process will never launch more than one additional subprocess (since as soon as it launches one, it will wait for that one to complete before continuing).

The most direct way to fix this would be something like:

processes = []
for _i, _v in enumerate(info):
    print 'start....', _i
    _p = multiprocessing.Process(target=select, args=(_v, ))
    _p.start()
    processes.append(_p)
for _p in processes:
    _p.join()

However, a better way would be to use the pool object that you already created. For this, the code should look something like

pool.apply(select, info)

However, I think you'd be happier making select return the data it gets (instead of appending it to an array) and calling pool.map instead of pool.apply. This should help avoid some race conditions and shared memory problems that I think you'd otherwise run into.

You can read more about these functions at https://docs.python.org/2/library/multiprocessing.html, although I expect you've already been there.

Upvotes: 2

Related Questions