Reputation: 386
I am trying to unzip a file that is in an Azure ADLS Gen2 container through Azure Databricks Pyspark. When I use ZipFile, I get a BadZipFile
error or a FileNotFoundError
.
I can read CSVs in the same folder, but not the zip files.
The zip filepath is the same filepath I get from dbutils.fs.ls(blob_folder_url)
.
Code:
import zipfile, os, io, re
# Azure Blob Storage details
storage_account_name = "<>"
container_name = "<>"
folder_path = "<>"
blob_folder_url = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{folder_path}"
zip_file = blob_folder_url + 'batch1_weekly_catman_20241109.zip'
# List files in the specified blob folder
files = dbutils.fs.ls(blob_folder_url)
for file in files:
# Check if the file is a ZIP file
if file.name.endswith('.zip'):
print(f"Processing ZIP file: {file.name}")
# Read the ZIP file into memory
zip_file_path = file.path
zip_blob_data = dbutils.fs.head(zip_file_path) # Read the ZIP file content
# Unzip the file
with zipfile.ZipFile(io.BytesIO(zip_blob_data.encode('utf-8')), 'r') as z:
print('zipppppppper')
# with zipfile.ZipFile(zip_file, 'r') as z:
# print('zipppppppper')
Error Messages:
Upvotes: -1
Views: 192
Reputation: 386
Here is the code I'm using to unzip, write the csv, and archive the zipped file.
<!-- language:python-->
%pip install azure-storage-blob
dbutils.library.restartPython()
from azure.storage.blob import BlobServiceClient
from io import BytesIO
import tempfile, os, zipfile, re
def unzip_and_upload_to_blob(
source_connection_string,
source_container,
source_zipped_files,
dest_container,
archive_folder_path
):
for zipped_file in source_zipped_files:
# Source blob client setup
source_blob_service = BlobServiceClient.from_connection_string(source_connection_string)
source_container_client = source_blob_service.get_container_client(source_container)
source_blob_client = source_container_client.get_blob_client(zipped_file)
# Destination blob client setup (using same connection string)
dest_container_client = source_blob_service.get_container_client(dest_container)
# Archive Blob Client setup
archive_file_path = archive_folder_path + get_filename(zipped_file)
archive_blob_client = source_container_client.get_blob_client(archive_file_path)
# Create destination path with .csv extension
dest_path = os.path.splitext(zipped_file)[0] + '.csv'
# Download and process zip file in memory
print(f"Downloading zip file from: {zipped_file}")
blob_data = source_blob_client.download_blob()
zip_bytes = blob_data.readall()
zip_buffer = BytesIO(zip_bytes)
# Create a temporary directory for extracted files
with tempfile.TemporaryDirectory() as temp_dir:
# Extract files to temporary directory
print("Extracting zip file...")
with zipfile.ZipFile(zip_buffer) as zip_ref:
zip_ref.extractall(temp_dir)
# Get list of files in temp directory
extracted_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
if file.endswith('.csv'): # Only process CSV files
local_file_path = os.path.join(root, file)
extracted_files.append(local_file_path)
if not extracted_files:
raise Exception("No CSV files found in the zip archive")
# Upload the CSV file to destination
if len(extracted_files) == 1:
# If there's only one CSV file, upload it with the destination name
with open(extracted_files[0], 'rb') as data:
print(f"Uploading to: {dest_path}")
dest_container_client.upload_blob(
name=dest_path,
data=data,
overwrite=True
)
print(f"Successfully uploaded to: {dest_path}")
# Archive the zipped blob
# Move the blob
try:
# Copy the blob to the new location in the archive folder
copy_status = archive_blob_client.start_copy_from_url(source_blob_client.url)
# Wait for the copy to complete
while copy_status['copy_status'] == "pending":
copy_status = archive_blob_client.get_blob_properties().copy
if copy_status['copy_status'] == "success":
# Delete the original blob
source_blob_client.delete_blob()
print(f"Blob '{zipped_file}' moved to '{archive_file_path}' successfully.")
else:
print(f"Failed to copy blob: {zipped_file}")
except Exception as e:
print(f"An error occurred while moving the blob: {e}")
else:
# If there are multiple CSV files, raise an exception
raise Exception(f"Found multiple CSV files in zip archive: {len(extracted_files)}. Expected only one.")
Here is the code to run that function:
# unzip the files and upload unzipped files to blob, then delete the zipped files
unzip_and_upload_to_blob(
source_connection_string=connection_string,
source_container=source_container,
source_zipped_files=zipped_files,
dest_container=dest_container,
archive_folder_path=archive_folder_path
)
Here is the code to get the list of zipped files:
def list_zip_files(connection_string, source_container, source_blob_prefix):
# Create the BlobServiceClient object
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# Get the container client
container_client = blob_service_client.get_container_client(source_container)
zipped_files = []
# List blobs in the container
print(f"Listing .zip blobs in container '{source_container}':")
try:
blob_list = container_client.list_blobs(name_starts_with=source_blob_prefix)
for blob in blob_list:
if ".zip" in blob.name:
zipped_files.append(blob.name)
print(f"Blob name: {blob.name}, Blob size: {blob.size} bytes")
except Exception as e:
print(f"An error occurred: {e}")
return zipped_files
Here is the code to run that function: The connection string is stored in an azure key vault.
# Azure Blob Storage details
connection_string = dbutils.secrets.get(scope='KEY_VAULT_NAME', key='KEY_NAME')
storage_account_name = "STORAGE_ACCOUNT_NAME"
source_container = "SOURCE_CONTAINER_NAME"
source_blob_prefix = 'BLOB_PREFIX'
dest_container = "DESTINATION_CONTAINER_NAME"
folder_path = "SOURCE_FILE_PATH"
archive_folder_path = "ARCHIVE_FILE_PATH"
# get a list of zipped files in the weekly_sales folder:
zipped_files = list_zip_files(connection_string, source_container, source_blob_prefix)
Upvotes: 0
Reputation: 11489
You can use the below code to unzip one zip file and store the files back to the target location.
First read the zip file as a spark dataframe in the binaryFile
format and store the binary content using collect()
on the dataframe.
Now, apply the ZipFile(io.BytesIO)
on this binary content and loop through this. In the loop, check the end of the filename is .csv
or not and use dbutils.fs,put()
to write it to the target location.
from zipfile import ZipFile
from io import BytesIO
source_file_path = "abfss://<zipcontainer>@<storage_account_name>.dfs.core.windows.net/<filename>.zip"
target_dir_path = "abfss://<target_container>@<storage_account_name>.dfs.core.windows.net/<targetfolder>/"
zip_df = spark.read.format("binaryFile").load(source_file_path)
zip_content = zip_df.select("content").collect()[0][0]
with ZipFile(BytesIO(zip_content), 'r') as zipping:
for fname in zipping.namelist():
if fname.endswith(".csv"):
f_content = zipping.read(fname)
target_filepath = f"{target_dir_path}{fname}"
print(fname)
dbutils.fs.put(target_filepath, f_content.decode("utf-8"), overwrite=True)
print("Unzipped and saved")
Output:
Unzipped files in the target location:
The above code is for one zip file. You can use dbutils.fs.ls()
to list out all the zip files and apply the same code on each file. You need to make changes in your target location for each zip file as per your requirement.
Upvotes: 1