Mariah Akinbi
Mariah Akinbi

Reputation: 386

Unzip file in Azure Blob storage from Databricks

I am trying to unzip a file that is in an Azure ADLS Gen2 container through Azure Databricks Pyspark. When I use ZipFile, I get a BadZipFile error or a FileNotFoundError.

I can read CSVs in the same folder, but not the zip files.

The zip filepath is the same filepath I get from dbutils.fs.ls(blob_folder_url).

BadZipeFile code: BadZipFile

FileNotFound code: FileNotFound

Reading a CSV code: reading csv

Code:

import zipfile, os, io, re

# Azure Blob Storage details
storage_account_name = "<>"
container_name = "<>"
folder_path = "<>"

blob_folder_url = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{folder_path}"
zip_file = blob_folder_url + 'batch1_weekly_catman_20241109.zip'

# List files in the specified blob folder
files = dbutils.fs.ls(blob_folder_url)

for file in files:
    # Check if the file is a ZIP file
    if file.name.endswith('.zip'):
        print(f"Processing ZIP file: {file.name}")

        # Read the ZIP file into memory
        zip_file_path = file.path
        zip_blob_data = dbutils.fs.head(zip_file_path)  # Read the ZIP file content

        # Unzip the file
        with zipfile.ZipFile(io.BytesIO(zip_blob_data.encode('utf-8')), 'r') as z:
            print('zipppppppper')
        # with zipfile.ZipFile(zip_file, 'r') as z:
        #     print('zipppppppper')  

Error Messages:

  1. BadZipFile: File is not a zip file
  2. FilenotFoundError: [Errno 2] No such file or directory

Upvotes: -1

Views: 192

Answers (2)

Mariah Akinbi
Mariah Akinbi

Reputation: 386

Here is the code I'm using to unzip, write the csv, and archive the zipped file.

    <!-- language:python-->
    %pip install azure-storage-blob
    
    dbutils.library.restartPython()
    
    from azure.storage.blob import BlobServiceClient
    from io import BytesIO
    import tempfile, os, zipfile, re

    def unzip_and_upload_to_blob(
    source_connection_string,
    source_container,
    source_zipped_files,
    dest_container,
    archive_folder_path
):
    
    for zipped_file in source_zipped_files:
        # Source blob client setup
        source_blob_service = BlobServiceClient.from_connection_string(source_connection_string)
        source_container_client = source_blob_service.get_container_client(source_container)
        source_blob_client = source_container_client.get_blob_client(zipped_file)
        
        # Destination blob client setup (using same connection string)
        dest_container_client = source_blob_service.get_container_client(dest_container)
        
        # Archive Blob Client setup
        archive_file_path = archive_folder_path + get_filename(zipped_file)
        archive_blob_client = source_container_client.get_blob_client(archive_file_path)

        
        # Create destination path with .csv extension
        dest_path = os.path.splitext(zipped_file)[0] + '.csv'
        
        # Download and process zip file in memory
        print(f"Downloading zip file from: {zipped_file}")
        blob_data = source_blob_client.download_blob()
        zip_bytes = blob_data.readall()
        zip_buffer = BytesIO(zip_bytes)
        
        # Create a temporary directory for extracted files
        with tempfile.TemporaryDirectory() as temp_dir:
            # Extract files to temporary directory
            print("Extracting zip file...")
            with zipfile.ZipFile(zip_buffer) as zip_ref:
                zip_ref.extractall(temp_dir)
                
            # Get list of files in temp directory
            extracted_files = []
            for root, dirs, files in os.walk(temp_dir):
                for file in files:
                    if file.endswith('.csv'):  # Only process CSV files
                        local_file_path = os.path.join(root, file)
                        extracted_files.append(local_file_path)
            
            if not extracted_files:
                raise Exception("No CSV files found in the zip archive")
            
            # Upload the CSV file to destination
            if len(extracted_files) == 1:
                # If there's only one CSV file, upload it with the destination name
                with open(extracted_files[0], 'rb') as data:
                    print(f"Uploading to: {dest_path}")
                    dest_container_client.upload_blob(
                        name=dest_path,
                        data=data,
                        overwrite=True
                    )
                    print(f"Successfully uploaded to: {dest_path}")

                    # Archive the zipped blob
                    # Move the blob
                    try:
                        # Copy the blob to the new location in the archive folder
                        copy_status = archive_blob_client.start_copy_from_url(source_blob_client.url)

                        # Wait for the copy to complete
                        while copy_status['copy_status'] == "pending":
                            copy_status = archive_blob_client.get_blob_properties().copy

                        if copy_status['copy_status'] == "success":
                            # Delete the original blob
                            source_blob_client.delete_blob()
                            print(f"Blob '{zipped_file}' moved to '{archive_file_path}' successfully.")
                        else:
                            print(f"Failed to copy blob: {zipped_file}")
                    except Exception as e:
                        print(f"An error occurred while moving the blob: {e}")
            else:
                # If there are multiple CSV files, raise an exception
                raise Exception(f"Found multiple CSV files in zip archive: {len(extracted_files)}. Expected only one.")

Here is the code to run that function:

# unzip the files and upload unzipped files to blob, then delete the zipped files
unzip_and_upload_to_blob(
        source_connection_string=connection_string,
        source_container=source_container,
        source_zipped_files=zipped_files,
        dest_container=dest_container,
        archive_folder_path=archive_folder_path
    )

Here is the code to get the list of zipped files:

def list_zip_files(connection_string, source_container, source_blob_prefix):
    # Create the BlobServiceClient object
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)

    # Get the container client
    container_client = blob_service_client.get_container_client(source_container)

    zipped_files = []
    # List blobs in the container
    print(f"Listing .zip blobs in container '{source_container}':")
    try:
        blob_list = container_client.list_blobs(name_starts_with=source_blob_prefix)
        for blob in blob_list:
            if ".zip" in blob.name:
                zipped_files.append(blob.name)
                print(f"Blob name: {blob.name}, Blob size: {blob.size} bytes")
    except Exception as e:
        print(f"An error occurred: {e}")
    return zipped_files

Here is the code to run that function: The connection string is stored in an azure key vault.

# Azure Blob Storage details
connection_string = dbutils.secrets.get(scope='KEY_VAULT_NAME', key='KEY_NAME')
storage_account_name = "STORAGE_ACCOUNT_NAME"
source_container = "SOURCE_CONTAINER_NAME"
source_blob_prefix = 'BLOB_PREFIX'
dest_container = "DESTINATION_CONTAINER_NAME"
folder_path = "SOURCE_FILE_PATH"
archive_folder_path = "ARCHIVE_FILE_PATH"

# get a list of zipped files in the weekly_sales folder:
zipped_files = list_zip_files(connection_string, source_container, source_blob_prefix)

Upvotes: 0

Rakesh Govindula
Rakesh Govindula

Reputation: 11489

You can use the below code to unzip one zip file and store the files back to the target location.

First read the zip file as a spark dataframe in the binaryFile format and store the binary content using collect() on the dataframe.

Now, apply the ZipFile(io.BytesIO) on this binary content and loop through this. In the loop, check the end of the filename is .csv or not and use dbutils.fs,put() to write it to the target location.

from zipfile import ZipFile
from io import BytesIO

source_file_path = "abfss://<zipcontainer>@<storage_account_name>.dfs.core.windows.net/<filename>.zip"
target_dir_path = "abfss://<target_container>@<storage_account_name>.dfs.core.windows.net/<targetfolder>/"

zip_df = spark.read.format("binaryFile").load(source_file_path)

zip_content = zip_df.select("content").collect()[0][0]

with ZipFile(BytesIO(zip_content), 'r') as zipping:
    for fname in zipping.namelist():
        if fname.endswith(".csv"):
          f_content = zipping.read(fname)
          target_filepath = f"{target_dir_path}{fname}"
          print(fname)
          dbutils.fs.put(target_filepath, f_content.decode("utf-8"), overwrite=True)

print("Unzipped and saved")

Output:

enter image description here

Unzipped files in the target location:

enter image description here

The above code is for one zip file. You can use dbutils.fs.ls() to list out all the zip files and apply the same code on each file. You need to make changes in your target location for each zip file as per your requirement.

Upvotes: 1

Related Questions