Reputation: 17478
In Python, if the database is very large, a simple select query will take a lot of time. I have a table with 4,700,000 records, and if I use SELECT * FROM MY_TABLE
to get all the data in the table, it will take 18 minutes. By setting chunk_size
and implement parallel query, it will save time.
So, my code is:
import os
import time
import multiprocessing
import pandas as pd
import MySQLdb as mysql
if __name__ == '__main__':
conn = mysql.connect(host='192.168.0.114',
user='root',
passwd='fit123456',
db='A_stock_day',
charset='utf8'
)
limit = 100000
offset = 0
dfs = []
print 'start.....'
_s = time.time()
while True:
_query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\
(limit, offset)
dfs.append(pd.read_sql(_query, conn))
offset += limit
if len(dfs[-1]) < limit:
break
_e = time.time()
print 'Time: ', _e - _s
full_df = pd.concat(dfs)
But, it still takes about 10 minutes. How to parallelize it, let many threads to run at the same time and make execution time down to the execution time of one thread? I have the multiprocessing code here:
def select(info):
""""""
limit, offset, conn = info[0], info[1], info[2]
_query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\
(limit, offset)
s = time.time()
info[3].append(pd.read_sql(_query, conn))
e = time.time()
print 'time: ', e - s, ' pid: ', os.getpid()
if __name__ == '__main__':
conn = mysql.connect(host='192.168.0.114',
user='root',
passwd='fit123456',
db='A_stock_day',
charset='utf8'
)
dfs, p, pool= [], [], multiprocessing.Pool(7)
info = [(1000000, 0, conn, dfs),
(1000000, 1000000, conn, dfs),
(1000000, 2000000, conn, dfs),
(1000000, 3000000, conn, dfs),
(1000000, 4000000, conn, dfs),
(1000000, 5000000, conn, dfs),
(1000000, 6000000, conn, dfs),
]
for _i, _v in enumerate(info):
print 'start....', _i
_p = multiprocessing.Process(target=select, args=(_v, ))
_p.start()
_p.join()
print 'The End'
As you can see, although it launched multiprocessing, only one process reads the database at one time. So, that is just multiprocessing, not parallel process.
How to implement parallel multiprocessing to save time? Thanks.
Upvotes: 2
Views: 2957
Reputation: 445
In your loop
for _i, _v in enumerate(info):
print 'start....', _i
_p = multiprocessing.Process(target=select, args=(_v, ))
_p.start()
_p.join()
you're starting processes and then joining on them immediately. This means that your main process will never launch more than one additional subprocess (since as soon as it launches one, it will wait for that one to complete before continuing).
The most direct way to fix this would be something like:
processes = []
for _i, _v in enumerate(info):
print 'start....', _i
_p = multiprocessing.Process(target=select, args=(_v, ))
_p.start()
processes.append(_p)
for _p in processes:
_p.join()
However, a better way would be to use the pool
object that you already created. For this, the code should look something like
pool.apply(select, info)
However, I think you'd be happier making select
return the data it gets (instead of appending it to an array) and calling pool.map
instead of pool.apply
. This should help avoid some race conditions and shared memory problems that I think you'd otherwise run into.
You can read more about these functions at https://docs.python.org/2/library/multiprocessing.html, although I expect you've already been there.
Upvotes: 2