ChrisArmstrong
ChrisArmstrong

Reputation: 2521

Multiprocessing pool returning wrong results

Another confused parallel coder here!

Our internal Hive database has an API layer to which we need to use to access the data. There is a 300 second query timeout limit, so I wanted to use multiprocessing to execute multiple queries in parallel:

from multiprocessing import Pool
import pandas as pd
import time
from hive2pandas_anxpy import Hive2Pandas   # custom module for querying our Hive db and converting the results to a Pandas dataframe
import datetime

def run_query(hour):
    start_time = time.time()
    start_datetime = datetime.datetime.now()

    query = """SELECT id, revenue from table where date='2014-05-20 %s' limit 50""" % hour
    h2p = Hive2Pandas(query, 'username')
    h2p.run()

    elapsed_time = int(time.time() - start_time)
    end_datetime = datetime.datetime.now()

    return {'query':query, 'start_time':start_datetime, 'end_time':end_datetime, 'elapsed_time':elapsed_time, 'data':h2p.data_df}

if __name__ == '__main__':

    start_time = time.time()
    pool = Pool(4)
    hours = ['17','18','19']
    results = pool.map_async(run_query, hours)
    pool.close()
    pool.join()
    print int(time.time() - start_time)

The issue I'm having is that one of the queries always returns no data, but when I run the same query in the usual fashion, it returns data. Since I'm new to multiprocessing, I'm wondering if there are there any obvious issues with how I'm using it above?

Upvotes: 1

Views: 1319

Answers (1)

CasualDemon
CasualDemon

Reputation: 6160

I think the issue you are having is that the results object is not ready by the time you want to use it. Also if you have a known amount of time for a timeout, I would suggest using that to your advantage in the code.

This code shows an example of how you can force a timeout after 300 seconds if the results from all of them are not collected by then.

if __name__ == '__main__':
    start_time = time.time()
    hours = ['17','18','19']

    with Pool(processes=4) as pool: 
        results = pool.map_async(run_query, hours)
        print(results.get(timeout=300))

    print int(time.time() - start_time)

Otherwise you should still be using results.get() to return your data, or specify a callback function for map_async.

Upvotes: 1

Related Questions