Reputation: 280
I am attempting to combine multiple gzip
streams into a single stream, my understanding is this should be possible but my implementation is flawed.
Based on what I have read, my expectation was that I should be able remove the 10byte header and 8 byte footer from all streams, concatenate the bytes together and reconstruct a header and footer.
However, when I do try and do this the decompression operation fails, I am assuming this is because the .flush()
is including some information in the block about "end of data" that is not being removed.
It is possible to concatenate multiple gzip
streams together without altering them. This is a valid gzip
file containing multiple streams.
Unfortunately, when using zlib.decompress(data, GZIP_WBITS)
, rather than using decompressobj
to check for an unconsumed_tail
, only the first stream is returned.
Example to show how concatenation might break some downstream clients consuming these files.
import zlib
GZIP_WBITS = 16 + zlib.MAX_WBITS
def decompress(data: bytes) -> bytes:
return zlib.decompress(data, GZIP_WBITS)
def compress(data: list[bytes]) -> bytes:
output = b""
for datum in data:
deflate = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
output += deflate.compress(datum)
output += deflate.flush()
return output
def test_decompression():
data = [b"Hello", b"World!"]
compressed = compress(data)
decompressed = decompress(compressed)
# this should be b"".join(data) == decompressed
assert decompressed == data[0]
import zlib
import struct
test_bytes = b"hello world"
# create an GZIP example stream
deflate = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
single = deflate.compress(test_bytes)
single += deflate.flush()
# quick sanity check that decompression works
zlib.decompress(single, GZIP_WBITS)
print("Single:", single.hex())
# check our understanding of the footer is correct
single_len = struct.unpack("<I", single[-4:])[0]
assert single_len == len(test_bytes), "wrong len"
single_crc = struct.unpack("<I", single[-8:-4])[0]
assert single_crc == zlib.crc32(test_bytes), "wrong crc"
# Create an example GZIP stream with duplicated input bytes
deflate = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
double = deflate.compress(test_bytes)
double += deflate.compress(test_bytes)
double += deflate.flush()
# quick sanity check that decompression works
zlib.decompress(double, GZIP_WBITS)
# Check we can calculate the len and bytes correctly
double_length = struct.unpack("<I", double[-4:])[0]
assert double_length == len(test_bytes + test_bytes), "wrong len"
double_crc = struct.unpack("<I", double[-8:-4])[0]
assert double_crc == zlib.crc32(test_bytes + test_bytes), "wrong crc"
print(f"Double: {double.hex()}")
# Remove the header and footer from our original GZIP stream
single_data = single[10:-8]
print(f" Data: {' '*20}{single_data.hex()}")
# Concatenate the original stream with footer removed with a duplicate
# with the header and footer removed
concatenated = single[:-8] + single_data
# Add the footer, comprising the CRC and Length
concatenated += struct.pack("<I", double_crc)
concatenated += struct.pack("<I", double_length)
assert concatenated .startswith(single[:-8])
print(f" Maybe: {concatenated.hex()}")
# Confirm this is bad data
zlib.decompress(concatenated, GZIP_WBITS)
My assumption is it will be possible to use the following function to combine the crc32 values:
def crc_combine(crcA, crcB, lenB):
crcA0 = zlib.crc32(b'\0' * lenB, crcA ^ 0xffffffff) ^ 0xffffffff
return crcA0 ^ crcB
crc32_combine
function.zlib.decompress(data, GZIP_WBITS)
as the resultant files form part of a "public interface" and this would be considered a breaking change.Upvotes: 1
Views: 83
Reputation: 280
Based on the clarification from @MarkAdler about the last-block bit and the byte boundary (and his many other supporting answers) I have stitched together this relatively simple version.
As I understand it, this approach can work as I have control over the creation of the original gzip
streams. Otherwise, I would have to go twiddling bits like gzjoin does.
import struct
import zlib
from typing import TypedDict
GZIP_WBITS = 16 + zlib.MAX_WBITS
class PrepareRet(TypedDict):
data: bytes
length: int
crc: int
def decompress(data: bytes) -> bytes:
return zlib.decompress(data, GZIP_WBITS)
def prepare(data: bytes) -> PrepareRet:
deflate = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
output = deflate.compress(data)
# use Z_SYNC_FLUSH so the streams can be concatenated
output += deflate.flush(zlib.Z_SYNC_FLUSH)
return {
"data": output,
"length": len(data),
"crc": zlib.crc32(data),
}
def crc_combine(crcA: int, crcB: int, lenB: int):
crcA0 = zlib.crc32(b"\0" * lenB, crcA ^ 0xFFFFFFFF) ^ 0xFFFFFFFF
return crcA0 ^ crcB
def concatenate(streams: list[PrepareRet]) -> bytes:
output = b""
total_length = 0
combined_crc = 0
output = bytearray()
for stream in streams:
# remove the 10byte gzip header
output += stream["data"][10:]
total_length += stream["length"]
combined_crc = crc_combine(combined_crc, stream["crc"], stream["length"])
# Create an empty block to use the header and trailer
compressor = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
empty = compressor.flush()
# Get the gzip header bytes
gzip_header = empty[:10]
# Create the Z_FINISH trailer
trailer = empty[10:-8]
trailer += struct.pack("<L", combined_crc)
trailer += struct.pack("<L", total_length)
return gzip_header + output + trailer
def test_equal():
data = [b"Hello", b"World!"]
streams = [prepare(datum) for datum in data]
compressed = concatenate(streams)
assert b"".join(data) == decompress(compressed)
Upvotes: 0
Reputation: 112502
This is much easier than you're making it out to be. Simply concatenate the gzip files without removing or in any way messing with the headers and trailers. Any concatenation of gzip streams is a valid gzip stream, and will decompress to the concatenation of the uncompressed contents of the individual gzip streams.
Upvotes: 4