Reputation: 154484
I'm trying to figure out the best way to compress a stream with Python's zlib
.
I've got a file-like input stream (input
, below) and an output function which accepts a file-like (output_function
, below):
with open("file") as input:
output_function(input)
And I'd like to gzip-compress input
chunks before sending them to output_function
:
with open("file") as input:
output_function(gzip_stream(input))
It looks like the gzip module assumes that either the input or the output will be a gzip'd file-on-disk… So I assume that the zlib module is what I want.
However, it doesn't natively offer a simple way to create a stream file-like… And the stream-compression it does support comes by way of manually adding data to a compression buffer, then flushing that buffer.
Of course, I could write a wrapper around zlib.Compress.compress
and zlib.Compress.flush
(Compress
is returned by zlib.compressobj()
), but I'd be worried about getting buffer sizes wrong, or something similar.
So, what's the simplest way to create a streaming, gzip-compressing file-like with Python?
Edit: To clarify, the input stream and the compressed output stream are both too large to fit in memory, so something like output_function(StringIO(zlib.compress(input.read())))
doesn't really solve the problem.
Upvotes: 35
Views: 26185
Reputation: 16677
An even cleaner & more generalized version made of reusable components:
gzipped_iter = igizip(io_iter(input_file_obj))
gzipped_file_obj = iter_io(prefetch(gzipped_iter))
The functions above are from my gist:
iter_io
and io_iter
provide transparent conversion to/from Iterable[AnyStr]
<-> SupportsRead[AnyStr]
igzip
does streaming gzip compressionprefetch
concurrently pulls from an underlying iterable via a thread, yielding to consumer as normal, for concurrent read/writedef as_bytes(s: str | bytes):
if type(s) not in [str, bytes]:
raise TypeError
return s.encode() if isinstance(s, str) else s
def iter_io(iterable: Iterable[AnyStr], buffer_size: int = io.DEFAULT_BUFFER_SIZE):
"""
Returns a buffered file obj that reads bytes from an iterable of str/bytes.
Example:
iter_io(['abc', 'def', 'g']).read() == b'abcdefg'
iter_io([b'abcd', b'efg']).read(5) == b'abcde'
"""
class IterIO(io.RawIOBase):
def __init__(self, iterable: Iterable[AnyStr]):
self._leftover = b''
self._iterable = (as_bytes(s) for s in iterable if s)
def readable(self):
return True
def readinto(self, buf):
try:
chunk = self._leftover or next(self._iterable)
except StopIteration:
return 0 # indicate EOF
output, self._leftover = chunk[:len(buf)], chunk[len(buf):]
buf[:len(output)] = output
return len(output)
return io.BufferedReader(IterIO(iterable), buffer_size=buffer_size)
def io_iter(fo: SupportsRead[AnyStr], size: int = io.DEFAULT_BUFFER_SIZE):
"""
Returns an iterator that reads from a file obj in sized chunks.
Example:
list(io_iter(io.StringIO('abcdefg'), 3)) == ['abc', 'def', 'g']
list(io_iter(io.BytesIO(b'abcdefg'), 4)) == [b'abcd', b'efg']
Usage notes/TODO:
* file obj isn't closed, fix /w keep_open=False and an internal contextmanager
"""
return iter(lambda: fo.read(size), fo.read(0))
def igzip(chunks: Iterable[AnyStr], level=zlib.Z_DEFAULT_COMPRESSION):
"""
Streaming gzip: lazily compresses an iterable of bytes or str (utf8)
Example:
gzipped_bytes_iter = igzip(['hello ', 'world!'])
gzip.decompress(b''.join(gzipped_bytes_iter)).encode() == 'hello world!'
"""
def gen():
gzip_format = 0b10000
c = zlib.compressobj(level=level, wbits=zlib.MAX_WBITS + gzip_format)
yield from (c.compress(as_bytes(chunk)) for chunk in chunks)
yield c.flush()
return filter(None, gen())
def prefetch(iterable: Iterable[Any], n: int = 1) -> Iterator[Any]:
"""
Prefetch an iterable via thread, yielding original contents as normal.
Example:
def slow_produce(*args):
for x in args:
time.sleep(1)
yield x
def slow_consume(iterable):
for _ in iterable:
time.sleep(1)
slow_consume(prefetch(slow_produce('a', 'b'))) # takes 3 sec, not 4
# Prefetch
# produce: | 'a' | 'b' |
# consume: | 'a' | 'b' |
# seconds: 0 --- 1 --- 2 --- 3
# No prefetch
# produce: | 'a' | | 'b' |
# consume: | 'a' | | 'b' |
# seconds: 0 --- 1 --- 2 --- 3 --- 4
Usage notes/TODO:
* mem leak: Thread is GC'd only after iterable is fully consumed, fix /w __del__
"""
queue = Queue(n)
finished = object()
def produce():
for x in iterable:
queue.put(x)
queue.put(finished)
t = Thread(target=produce, daemon=True)
t.start()
while True:
item = queue.get()
if item is finished:
break
else:
yield item
Upvotes: 0
Reputation: 1244
This works (at least in python 3):
with s3.open(path, 'wb') as f:
gz = gzip.GzipFile(filename, 'wb', 9, f)
gz.write(b'hello')
gz.flush()
gz.close()
Here it writes to s3fs's file object with a gzip compression on it.
The magic is the f
parameter, which is GzipFile's fileobj
. You have to provide a file name for gzip's header.
Upvotes: 0
Reputation: 116
The gzip module supports compressing to a file-like object, pass a fileobj parameter to GzipFile, as well as a filename. The filename you pass in doesn't need to exist, but the gzip header has a filename field which needs to be filled out.
Update
This answer does not work. Example:
# tmp/try-gzip.py
import sys
import gzip
fd=gzip.GzipFile(fileobj=sys.stdin)
sys.stdout.write(fd.read())
output:
===> cat .bash_history | python tmp/try-gzip.py > tmp/history.gzip
Traceback (most recent call last):
File "tmp/try-gzip.py", line 7, in <module>
sys.stdout.write(fd.read())
File "/usr/lib/python2.7/gzip.py", line 254, in read
self._read(readsize)
File "/usr/lib/python2.7/gzip.py", line 288, in _read
pos = self.fileobj.tell() # Save current position
IOError: [Errno 29] Illegal seek
Upvotes: 6
Reputation: 1410
Here is a cleaner, non-self-referencing version based on Ricardo Cárdenes' very helpful answer.
from gzip import GzipFile
from collections import deque
CHUNK = 16 * 1024
class Buffer (object):
def __init__ (self):
self.__buf = deque()
self.__size = 0
def __len__ (self):
return self.__size
def write (self, data):
self.__buf.append(data)
self.__size += len(data)
def read (self, size=-1):
if size < 0: size = self.__size
ret_list = []
while size > 0 and len(self.__buf):
s = self.__buf.popleft()
size -= len(s)
ret_list.append(s)
if size < 0:
ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:]
self.__buf.appendleft(remainder)
ret = ''.join(ret_list)
self.__size -= len(ret)
return ret
def flush (self):
pass
def close (self):
pass
class GzipCompressReadStream (object):
def __init__ (self, fileobj):
self.__input = fileobj
self.__buf = Buffer()
self.__gzip = GzipFile(None, mode='wb', fileobj=self.__buf)
def read (self, size=-1):
while size < 0 or len(self.__buf) < size:
s = self.__input.read(CHUNK)
if not s:
self.__gzip.close()
break
self.__gzip.write(s)
return self.__buf.read(size)
Advantages:
Upvotes: 11
Reputation: 9172
It's quite kludgy (self referencing, etc; just put a few minutes writing it, nothing really elegant), but it does what you want if you're still interested in using gzip
instead of zlib
directly.
Basically, GzipWrap
is a (very limited) file-like object that produces a gzipped file out of a given iterable (e.g., a file-like object, a list of strings, any generator...)
Of course, it produces binary so there was no sense in implementing "readline".
You should be able to expand it to cover other cases or to be used as an iterable object itself.
from gzip import GzipFile
class GzipWrap(object):
# input is a filelike object that feeds the input
def __init__(self, input, filename = None):
self.input = input
self.buffer = ''
self.zipper = GzipFile(filename, mode = 'wb', fileobj = self)
def read(self, size=-1):
if (size < 0) or len(self.buffer) < size:
for s in self.input:
self.zipper.write(s)
if size > 0 and len(self.buffer) >= size:
self.zipper.flush()
break
else:
self.zipper.close()
if size < 0:
ret = self.buffer
self.buffer = ''
else:
ret, self.buffer = self.buffer[:size], self.buffer[size:]
return ret
def flush(self):
pass
def write(self, data):
self.buffer += data
def close(self):
self.input.close()
Upvotes: 15
Reputation: 19145
Use the cStringIO (or StringIO) module in conjunction with zlib:
>>> import zlib
>>> from cStringIO import StringIO
>>> s.write(zlib.compress("I'm a lumberjack"))
>>> s.seek(0)
>>> zlib.decompress(s.read())
"I'm a lumberjack"
Upvotes: 3