Reputation: 91939
Here is the situation:
I get gzipped xml documents from Amazon S3
import boto
from boto.s3.connection import S3Connection
from boto.s3.key import Key
conn = S3Connection('access Id', 'secret access key')
b = conn.get_bucket('mydev.myorg')
k = Key(b)
k.key('documents/document.xml.gz')
I read them in file as
import gzip
f = open('/tmp/p', 'w')
k.get_file(f)
f.close()
r = gzip.open('/tmp/p', 'rb')
file_content = r.read()
r.close()
Question
How can I ungzip the streams directly and read the contents?
I do not want to create temp files, they don't look good.
Upvotes: 39
Views: 39865
Reputation: 817
You can try PIPE and read contents without downloading file
import subprocess
c = subprocess.Popen(['-c','zcat -c <gzip file name>'], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
for row in c.stdout:
print row
In addition "/dev/fd/" + str(c.stdout.fileno())
will provide you FIFO file name (Named pipe) which can be passed to other program.
Upvotes: 0
Reputation: 27012
Here's a version based on the answer by Martijn Pieters, but handles the case of multiple gzip streams concatenated together, which non-streaming solutions typically handle transparently.
import zlib
def gunzip(chunks):
dobj = zlib.decompressobj(32 + zlib.MAX_WBITS)
for chunk in chunks:
uncompressed_chunk = dobj.decompress(chunk)
if uncompressed_chunk:
yield uncompressed_chunk
elif dobj.eof:
unused = dobj.unused_data
dobj = zlib.decompressobj(32 + zlib.MAX_WBITS)
uncompressed_chunk = dobj.decompress(unused)
if uncompressed_chunk:
yield uncompressed_chunk
uncompressed_chunk = dobj.flush()
if uncompressed_chunk:
yield uncompressed_chunk
Used as
uncompressed_chunks = gunzip(k)
For background: I have now seen this case in the wild more than once, actually always in gzipped-CSV files, where either just the header row is in its own gzip stream, or every X number of rows are their own gzip stream. Without handling this case the streaming solution just seems to hang - bytes go in, but no bytes come out, which is because once eof is hit, then decompress always returns the empty byte string, even if you pass it more valid gzip data.
Upvotes: 0
Reputation: 47354
I did it this way for gzip files:
import gzip
import boto3
s3 = boto3.resource('s3')
obj = s3.Object(bucket_name='Bucket', key='file.gz')
with gzip.GzipFile(fileobj=obj.get()["Body"]) as file:
for line_bytes in file:
print(line_bytes)
Upvotes: 0
Reputation: 1121834
Yes, you can use the zlib
module to decompress byte streams:
import zlib
def stream_gzip_decompress(stream):
dec = zlib.decompressobj(32 + zlib.MAX_WBITS) # offset 32 to skip the header
for chunk in stream:
rv = dec.decompress(chunk)
if rv:
yield rv
if dec.unused_data:
# decompress and yield the remainder
yield dec.flush()
The offset of 32 signals to the zlib
header that the gzip header is expected but skipped.
The S3 key object is an iterator, so you can do:
for data in stream_gzip_decompress(k):
# do something with the decompressed data
Upvotes: 41
Reputation: 1613
For Python3x and boto3-
So I used BytesIO to read the compressed file into a buffer object, then I used zipfile to open the decompressed stream as uncompressed data and I was able to get the datum line by line.
import io
import zipfile
import boto3
import sys
s3 = boto3.resource('s3', 'us-east-1')
def stream_zip_file():
count = 0
obj = s3.Object(
bucket_name='MonkeyBusiness',
key='/Daily/Business/Banana/{current-date}/banana.zip'
)
buffer = io.BytesIO(obj.get()["Body"].read())
print (buffer)
z = zipfile.ZipFile(buffer)
foo2 = z.open(z.infolist()[0])
print(sys.getsizeof(foo2))
line_counter = 0
for _ in foo2:
line_counter += 1
print (line_counter)
z.close()
if __name__ == '__main__':
stream_zip_file()
Upvotes: 6
Reputation: 441
I had to do the same thing and this is how I did it:
import gzip
f = StringIO.StringIO()
k.get_file(f)
f.seek(0) #This is crucial
gzf = gzip.GzipFile(fileobj=f)
file_content = gzf.read()
Upvotes: 10