Sanjana Krishnan
Sanjana Krishnan

Reputation: 11

Unable to zip a .txt file using pyspark

I am trying to generate a file for a downstream consumer, for which we fetch data from an oracle table, and try to write a .txt.gz file into AWS S3. The idea is as follows -

  1. Generate multiple csv files, which get written into a single .txt file.
  2. Zip this .txt file to produce a .txt.gz file to send to the consumer.

I am able to get through step 1, but not able to figure out step 2.

def execute_script(environment: str,logger, input_json_path: str = '', isdebug: bool = False):  
    print('Started')  
    config = ConfigurationManager(env=environment).oraclerds_conf  

    sqlSelect = get_bcbsa_value_based_pgms_df(config)  
    logger.info('sqlSelect df created.')  
    sqlSelect.persist().count()  

    s3_bucket_name = config.get('tgt_bucket_name')  

    logger.info("total records : {}".format(sqlSelect.count()))  
    folder_name = f'{CONTEXT_NAME}/{STRDATE}/'  
    s3_bucket_name = config.get('tgt_bucket_name')  
    s3_path = f's3://{s3_bucket_name}/{FOLDER_PATH}/'  

    logger.info(f'Writing to {s3_path} as csv')  

    sqlSelect \  
            .replace("", None) \  
            .coalesce(1) \  
            .write \  
            .mode('overwrite') \  
            .format('csv') \  
            .option("header", "false") \  
            .option("sep", "|") \  
            .option("quote", "") \  
            .option("escape", "") \  
            .option("nullValue", None) \  
            .save(s3_path)  

    my_bucket = s3.Bucket(s3_bucket_name)  

    file_number = 0  
    logger.info(f'Target path : {s3_bucket_name}/{TARGET_FILE}')  
    for obj in my_bucket.objects.filter(Prefix=f'{FOLDER_PATH}'):  
        source_filename = (obj.key).split('/')[-1]  
        logger.info(f'Source file name: {source_filename}')  
        copy_source = {  
            'Bucket': s3_bucket_name,  
            'Key': obj.key  
        }  
        file_number += 1  
        s3.meta.client.copy(copy_source, s3_bucket_name, TARGET_FILE + ".txt")  
        **zip_obj = s3.Object(bucket_name='s3_bucket_name', key='obj.key')  
        buffer = BytesIO(zip_obj.get().read())  

        z = zipfile.ZipFile(buffer)  
        for filename in z.namelist():  
            file_info = z.getinfo(filename)  
            s3.meta.client.upload_fileobj(  
                z.open(filename),  
                Bucket=s3_bucket_name,  
                Key=obj.key  
            )**  
        s3.Object(s3_bucket_name, obj.key).delete()  

The part in bold is what I tried to do and was expecting a file with [FILE_NAME.txt.gz] in the desired S3 path. But the error looks like this - 2023-04-28 12:56:43,414 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last): File "/tmp/glue_extract_job.py", line 213, in execute_script(environment=EXEC_ENVIRONMENT,logger=logger, isdebug=False) File "/tmp/glue_extract_job.py", line 131, in execute_script buffer = BytesIO(zip_obj.get().read()) File "/home/spark/.local/lib/python3.7/site-packages/boto3/resources/factory.py", line 520, in do_action response = action(self, *args, **kwargs) File "/home/spark/.local/lib/python3.7/site-packages/boto3/resources/action.py", line 83, in call response = getattr(parent.meta.client, operation_name)(*args, **params) File "/home/spark/.local/lib/python3.7/site-packages/botocore/client.py", line 386, in _api_call return self._make_api_call(operation_name, kwargs) File "/home/spark/.local/lib/python3.7/site-packages/botocore/client.py", line 705, in _make_api_call raise error_class(parsed_response, operation_name) botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the GetObject operation: Access Denied

Any help would be appreciated!

Upvotes: 1

Views: 92

Answers (1)

parisni
parisni

Reputation: 1152

This is likely not a spark problem, but boto not having access to s3 in execute_script buffer = BytesIO(zip_obj.get().read())....GetObject operation: Access Denied

By the way, you can compress the csv to gzip with spark, just add .option("compression", "gzip") and you won't get to configure boto3 to zip the csv file.

Upvotes: 0

Related Questions