Reputation: 3601
Is there a Python library that allows manipulation of zip archives in memory, without having to use actual disk files?
The ZipFile library does not allow you to update the archive. The only way seems to be to extract it to a directory, make your changes, and create a new zip from that directory. I want to modify zip archives without disk access, because I'll be downloading them, making changes, and uploading them again, so I have no reason to store them.
Something similar to Java's ZipInputStream/ZipOutputStream would do the trick, although any interface at all that avoids disk access would be fine.
Upvotes: 133
Views: 135223
Reputation: 6746
import io
import zipfile
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
for file_name, data in [('1.txt', io.BytesIO(b'111')),
('2.txt', io.BytesIO(b'222'))]:
zip_file.writestr(file_name, data.getvalue())
with open('C:/archive.zip', 'wb') as f:
f.write(zip_buffer.getvalue())
Upvotes: 140
Reputation: 1
It's important to note that if you want to use the newly created in-memory Zip archive outside of Python, such as saving it to a local disk, or sent through a POST request, it needs to have the end of central directory records written to it; otherwise, it won't be recognized as a valid ZIP file.
This would look like (for Python 3.11)
with(
io.BytesIO() as raw,
zipfile.ZipFile(raw, "a", zipfile.ZIP_DEFLATED, False) as zip
):
for file_name, file_data in ["example_dir/example_file.txt", bytes]:
zip.writestr(file_name, file_data)
zip.close() # THIS is REQUIRED!
requests.post(addr, files = {"file": ("zip_name.zip", zip.getbuffer())})
Upvotes: 0
Reputation: 27052
You can use the library libarchive in Python through ctypes - it offers ways of manipulating ZIP data in memory, with a focus on streaming (at least historically).
Say we want to uncompress ZIP files on the fly while downloading from an HTTP server. The below code
from contextlib import contextmanager
from ctypes import CFUNCTYPE, POINTER, create_string_buffer, cdll, byref, c_ssize_t, c_char_p, c_int, c_void_p, c_char
from ctypes.util import find_library
import httpx
def get_zipped_chunks(url, chunk_size=6553):
with httpx.stream('GET', url) as r:
yield from r.iter_bytes()
def stream_unzip(zipped_chunks, chunk_size=65536):
# Library
libarchive = cdll.LoadLibrary(find_library('archive'))
# Callback types
open_callback_type = CFUNCTYPE(c_int, c_void_p, c_void_p)
read_callback_type = CFUNCTYPE(c_ssize_t, c_void_p, c_void_p, POINTER(POINTER(c_char)))
close_callback_type = CFUNCTYPE(c_int, c_void_p, c_void_p)
# Function types
libarchive.archive_read_new.restype = c_void_p
libarchive.archive_read_open.argtypes = [c_void_p, c_void_p, open_callback_type, read_callback_type, close_callback_type]
libarchive.archive_read_finish.argtypes = [c_void_p]
libarchive.archive_entry_new.restype = c_void_p
libarchive.archive_read_next_header.argtypes = [c_void_p, c_void_p]
libarchive.archive_read_support_compression_all.argtypes = [c_void_p]
libarchive.archive_read_support_format_all.argtypes = [c_void_p]
libarchive.archive_entry_pathname.argtypes = [c_void_p]
libarchive.archive_entry_pathname.restype = c_char_p
libarchive.archive_read_data.argtypes = [c_void_p, POINTER(c_char), c_ssize_t]
libarchive.archive_read_data.restype = c_ssize_t
libarchive.archive_error_string.argtypes = [c_void_p]
libarchive.archive_error_string.restype = c_char_p
ARCHIVE_EOF = 1
ARCHIVE_OK = 0
it = iter(zipped_chunks)
compressed_bytes = None # Make sure not garbage collected
@contextmanager
def get_archive():
archive = libarchive.archive_read_new()
if not archive:
raise Exception('Unable to allocate archive')
try:
yield archive
finally:
libarchive.archive_read_finish(archive)
def read_callback(archive, client_data, buffer):
nonlocal compressed_bytes
try:
compressed_bytes = create_string_buffer(next(it))
except StopIteration:
return 0
else:
buffer[0] = compressed_bytes
return len(compressed_bytes) - 1
def uncompressed_chunks(archive):
uncompressed_bytes = create_string_buffer(chunk_size)
while (num := libarchive.archive_read_data(archive, uncompressed_bytes, len(uncompressed_bytes))) > 0:
yield uncompressed_bytes.value[:num]
if num < 0:
raise Exception(libarchive.archive_error_string(archive))
with get_archive() as archive:
libarchive.archive_read_support_compression_all(archive)
libarchive.archive_read_support_format_all(archive)
libarchive.archive_read_open(
archive, 0,
open_callback_type(0), read_callback_type(read_callback), close_callback_type(0),
)
entry = c_void_p(libarchive.archive_entry_new())
if not entry:
raise Exception('Unable to allocate entry')
while (status := libarchive.archive_read_next_header(archive, byref(entry))) == ARCHIVE_OK:
yield (libarchive.archive_entry_pathname(entry), uncompressed_chunks(archive))
if status != ARCHIVE_EOF:
raise Exception(libarchive.archive_error_string(archive))
can be used as follows to do that
zipped_chunks = get_zipped_chunks('https://domain.test/file.zip')
files = stream_unzip(zipped_chunks)
for name, uncompressed_chunks in stream_unzip(zipped_chunks):
print(name)
for uncompressed_chunk in uncompressed_chunks:
print(uncompressed_chunk)
In fact since libarchive supports multiple archive formats, and nothing above is particularly ZIP-specific, it may well work with other formats.
Upvotes: 0
Reputation: 6569
Helper to create in-memory zip file with multiple files based on data like {'1.txt': 'string', '2.txt": b'bytes'}
import io, zipfile
def prepare_zip_file_content(file_name_content: dict) -> bytes:
"""returns Zip bytes ready to be saved with
open('C:/1.zip', 'wb') as f: f.write(bytes)
@file_name_content dict like {'1.txt': 'string', '2.txt": b'bytes'}
"""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
for file_name, file_data in file_name_content.items():
zip_file.writestr(file_name, file_data)
zip_buffer.seek(0)
return zip_buffer.getvalue()
Upvotes: 2
Reputation: 519
I am using Flask to create an in-memory zipfile and return it as a download. Builds on the example above from Vladimir. The seek(0)
took a while to figure out.
import io
import zipfile
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
for file_name, data in [('1.txt', io.BytesIO(b'111')), ('2.txt', io.BytesIO(b'222'))]:
zip_file.writestr(file_name, data.getvalue())
zip_buffer.seek(0)
return send_file(zip_buffer, attachment_filename='filename.zip', as_attachment=True)
Upvotes: 11
Reputation: 27052
I want to modify zip archives without disk access, because I'll be downloading them, making changes, and uploading them again, so I have no reason to store them
This is possible using the two libraries https://github.com/uktrade/stream-unzip and https://github.com/uktrade/stream-zip (full disclosure: written by me). And depending on the changes, you might not even have to store the entire zip in memory at once.
Say you just want to download, unzip, zip, and re-upload. Slightly pointless, but you could slot in some changes to the unzipped content:
from datetime import datetime
import httpx
from stream_unzip import stream_unzip
from stream_zip import stream_zip, ZIP_64
def get_source_bytes_iter(url):
with httpx.stream('GET', url) as r:
yield from r.iter_bytes()
def get_target_files(files):
# stream-unzip doesn't expose perms or modified_at, but stream-zip requires them
modified_at = datetime.now()
perms = 0o600
for name, _, chunks in files:
# Could change name, manipulate chunks, skip a file, or yield a new file
yield name.decode(), modified_at, perms, ZIP_64, chunks
source_url = 'https://source.test/file.zip'
target_url = 'https://target.test/file.zip'
source_bytes_iter = get_source_bytes_iter(source_url)
source_files = stream_unzip(source_bytes_iter)
target_files = get_target_files(source_files)
target_bytes_iter = stream_zip(target_files)
httpx.put(target_url, data=target_bytes_iter)
Upvotes: 1
Reputation: 134257
From the article In-Memory Zip in Python:
Below is a post of mine from May of 2008 on zipping in memory with Python, re-posted since Posterous is shutting down.
I recently noticed that there is a for-pay component available to zip files in-memory with Python. Considering this is something that should be free, I threw together the following code. It has only gone through very basic testing, so if anyone finds any errors, let me know and I’ll update this.
import zipfile
import StringIO
class InMemoryZip(object):
def __init__(self):
# Create the in-memory file-like object
self.in_memory_zip = StringIO.StringIO()
def append(self, filename_in_zip, file_contents):
'''Appends a file with name filename_in_zip and contents of
file_contents to the in-memory zip.'''
# Get a handle to the in-memory zip in append mode
zf = zipfile.ZipFile(self.in_memory_zip, "a", zipfile.ZIP_DEFLATED, False)
# Write the file to the in-memory zip
zf.writestr(filename_in_zip, file_contents)
# Mark the files as having been created on Windows so that
# Unix permissions are not inferred as 0000
for zfile in zf.filelist:
zfile.create_system = 0
return self
def read(self):
'''Returns a string with the contents of the in-memory zip.'''
self.in_memory_zip.seek(0)
return self.in_memory_zip.read()
def writetofile(self, filename):
'''Writes the in-memory zip to a file.'''
f = file(filename, "w")
f.write(self.read())
f.close()
if __name__ == "__main__":
# Run a test
imz = InMemoryZip()
imz.append("test.txt", "Another test").append("test2.txt", "Still another")
imz.writetofile("test.zip")
Upvotes: 60
Reputation: 76812
The example Ethier provided has several problems, some of them major:
InMemoryZip
attributeAn updated version is available if you install ruamel.std.zipfile
(of which I am the author). After
pip install ruamel.std.zipfile
or including the code for the class from here, you can do:
import ruamel.std.zipfile as zipfile
# Run a test
zipfile.InMemoryZipFile()
imz.append("test.txt", "Another test").append("test2.txt", "Still another")
imz.writetofile("test.zip")
You can alternatively write the contents using imz.data
to any place you need.
You can also use the with
statement, and if you provide a filename, the contents of the ZIP will be written on leaving that context:
with zipfile.InMemoryZipFile('test.zip') as imz:
imz.append("test.txt", "Another test").append("test2.txt", "Still another")
because of the delayed writing to disc, you can actually read from an old test.zip
within that context.
Upvotes: 22
Reputation: 42694
According to the Python docs:
class zipfile.ZipFile(file[, mode[, compression[, allowZip64]]])
Open a ZIP file, where file can be either a path to a file (a string) or a file-like object.
So, to open the file in memory, just create a file-like object (perhaps using BytesIO).
file_like_object = io.BytesIO(my_zip_data)
zipfile_ob = zipfile.ZipFile(file_like_object)
Upvotes: 125