Reputation: 1
Environment:
Key Points:
Issue: In the AWS Batch environment with multiprocessing, the script occasionally produces empty reports. The same script run locally without multiprocessing always succeeds.
Relevant code snippet:
def _f(batch):
process_id = multiprocessing.current_process().pid
print(f'Start Process ID: {process_id}, date range: {batch[2]}')
partner_tag = batch[0]
loan_type = batch[1]
dates = batch[2]
partner_loan_type_tag = partner_tag + '_' + loan_type
csv_files = []
done = False
start_key = None
i = 0
total_loans = 0
while not done:
i += 1
if start_key:
disbursement_range_loans_temp = partner_loan_table.query(
IndexName="partner_loan_type_tag-disbursement_date-index",
ProjectionExpression=projection_item_string,
KeyConditionExpression=Key('partner_loan_type_tag').eq(partner_loan_type_tag) & Key(
'disbursement_date').between(dates[0], dates[1]),
ExclusiveStartKey=start_key
)
else:
disbursement_range_loans_temp = partner_loan_table.query(
IndexName="partner_loan_type_tag-disbursement_date-index",
ProjectionExpression=projection_item_string,
KeyConditionExpression=Key('partner_loan_type_tag').eq(
partner_loan_type_tag) & Key('disbursement_date').between(dates[0], dates[1])
)
disbursement_range_loan_items = disbursement_range_loans_temp.get('Items', [
])
total_loans += len(disbursement_range_loan_items)
df = pd.json_normalize(disbursement_range_loan_items, sep='*')
if not df.empty:
process = multiprocessing.current_process()
csv_name=str(process.pid)+'_'+str(dates[0])+'_'+str(dates[1])+'_'+str(i)+'.csv'
df.to_csv(csv_name, index=False)
start_key = disbursement_range_loans_temp.get(
'LastEvaluatedKey', None)
done = start_key is None
print(f'END Process ID: {process_id}, date range: {batch[2]},total loans: {total_loans}')
return
with multiprocessing.get_context("fork").Pool(processes=num_processes) as pool:
pool.map(_f, batch)
then i merge all the csv file created by the function(_f) to get the final report
What I've tried:
Upvotes: 0
Views: 26