Ashik B N
Ashik B N

Reputation: 1

AWS Batch Python job with multiprocessing intermittently produces empty reports

Environment:

Key Points:

  1. The issue occurs in AWS Batch, but not consistently - sometime generate report but sometime generates the empty report.
  2. When run locally WITHOUT multiprocessing, the script always succeeds for the same date range.
  3. The AWS Batch job uses multiprocessing with the 'fork' context.

Issue: In the AWS Batch environment with multiprocessing, the script occasionally produces empty reports. The same script run locally without multiprocessing always succeeds.

Relevant code snippet:

def _f(batch):
    process_id = multiprocessing.current_process().pid
    print(f'Start Process ID: {process_id}, date range: {batch[2]}')
    partner_tag = batch[0]
    loan_type = batch[1]
    dates = batch[2]
    partner_loan_type_tag = partner_tag + '_' + loan_type
    csv_files = []
    done = False
    start_key = None
    i = 0
    total_loans = 0
    while not done:
        i +=  1
        if start_key:
            disbursement_range_loans_temp = partner_loan_table.query(
                IndexName="partner_loan_type_tag-disbursement_date-index",
                ProjectionExpression=projection_item_string,
                KeyConditionExpression=Key('partner_loan_type_tag').eq(partner_loan_type_tag) & Key(
                    'disbursement_date').between(dates[0], dates[1]),
                ExclusiveStartKey=start_key
            )

        else:
            disbursement_range_loans_temp = partner_loan_table.query(
                IndexName="partner_loan_type_tag-disbursement_date-index",
                ProjectionExpression=projection_item_string,
                KeyConditionExpression=Key('partner_loan_type_tag').eq(
                    partner_loan_type_tag) & Key('disbursement_date').between(dates[0], dates[1])
            )
        disbursement_range_loan_items = disbursement_range_loans_temp.get('Items', [
        ])
        total_loans += len(disbursement_range_loan_items)
        df = pd.json_normalize(disbursement_range_loan_items, sep='*')
        if not df.empty:
            process = multiprocessing.current_process()
            csv_name=str(process.pid)+'_'+str(dates[0])+'_'+str(dates[1])+'_'+str(i)+'.csv'
            df.to_csv(csv_name, index=False)

        start_key = disbursement_range_loans_temp.get(
            'LastEvaluatedKey', None)
        done = start_key is None
    print(f'END Process ID: {process_id}, date range: {batch[2]},total loans: {total_loans}')
    return


with multiprocessing.get_context("fork").Pool(processes=num_processes) as pool:
     pool.map(_f, batch)

then i merge all the csv file created by the function(_f) to get the final report

What I've tried:

  1. Verified data exists in DynamoDB for the given date range
  2. Checked for DynamoDB throttling or capacity issues(database works on demand)
  3. Verified AWS credentials and permissions
  4. Tested locally without multiprocessing (always succeeds)

Upvotes: 0

Views: 26

Answers (0)

Related Questions