Reputation: 2521
Another confused parallel coder here!
Our internal Hive database has an API layer to which we need to use to access the data. There is a 300 second query timeout limit, so I wanted to use multiprocessing to execute multiple queries in parallel:
from multiprocessing import Pool
import pandas as pd
import time
from hive2pandas_anxpy import Hive2Pandas # custom module for querying our Hive db and converting the results to a Pandas dataframe
import datetime
def run_query(hour):
start_time = time.time()
start_datetime = datetime.datetime.now()
query = """SELECT id, revenue from table where date='2014-05-20 %s' limit 50""" % hour
h2p = Hive2Pandas(query, 'username')
h2p.run()
elapsed_time = int(time.time() - start_time)
end_datetime = datetime.datetime.now()
return {'query':query, 'start_time':start_datetime, 'end_time':end_datetime, 'elapsed_time':elapsed_time, 'data':h2p.data_df}
if __name__ == '__main__':
start_time = time.time()
pool = Pool(4)
hours = ['17','18','19']
results = pool.map_async(run_query, hours)
pool.close()
pool.join()
print int(time.time() - start_time)
The issue I'm having is that one of the queries always returns no data, but when I run the same query in the usual fashion, it returns data. Since I'm new to multiprocessing, I'm wondering if there are there any obvious issues with how I'm using it above?
Upvotes: 1
Views: 1319
Reputation: 6160
I think the issue you are having is that the results object is not ready by the time you want to use it. Also if you have a known amount of time for a timeout, I would suggest using that to your advantage in the code.
This code shows an example of how you can force a timeout after 300 seconds if the results from all of them are not collected by then.
if __name__ == '__main__':
start_time = time.time()
hours = ['17','18','19']
with Pool(processes=4) as pool:
results = pool.map_async(run_query, hours)
print(results.get(timeout=300))
print int(time.time() - start_time)
Otherwise you should still be using results.get()
to return your data, or specify a callback function for map_async
.
Upvotes: 1