Niels
Niels

Reputation: 31

Extracting Gzip files from requests with multithreading

I'm trying to extract a bunch (62.000) gzip files. In these files is a text document formatted as JSON. Right now I'm downloading all these files with the requests module with multithreading:

def fetch_file(url,filename):
    try:
        html = requests.get(url,stream=True,allow_redirects=True)
        open('Streams Total Daily/'+filename+'.json.gz','wb').write(html.content)
        return html.status_code
    except requests.exceptions.RequestException as e:
       return e

def get_streams():
    threads = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        for uri in country_uris:
            split = uri.split('/')
            filename = 'streams_'+split[1] + '_' +split[4]+'_'+split[5]+'_'+split[6] + '_'+ split[7]
            url = f"{link}{uri}?access_token={access_token}"
            threads.append(executor.submit(fetch_file,url,filename))
        
        for task in as_completed(threads):
            print(task.result())

get_streams()

I have some code that can loop through the folder where the files are placed, but this takes a long time with 62000 files. I've tried some versions passing response.content through gzip.GzipFile(), but this just gives me empty files.

def fetch_file(url,filename):
    try:
        html = requests.get(url,stream=True,allow_redirects=True)
        gz = gzip.GzipFile(fileobj=html.content)
        with open('test/'+filename + '.json','wb') as out:
            out.write(gz.read())
        return html.status_code
    except requests.exceptions.RequestException as e:
       return e

def get_streams():
    threads = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        for uri in country_uris:
            split = uri.split('/')
            filename = 'streams_'+split[1] + '_' +split[4]+'_'+split[5]+'_'+split[6] + '_'+ split[7]
            url = f"{link}{uri}?access_token={access_token}"
            threads.append(executor.submit(fetch_file,url,filename))
        
        for task in as_completed(threads):
            print(task.result())
get_streams()

Does anyone have an idea on how to handle this? Any suggestions or solutions are much appreciated!

Upvotes: 0

Views: 432

Answers (1)

Niels
Niels

Reputation: 31

What worked for me, in the end, was the Zlib module. First getting a bytes response with response.content, then decompressing the data with zlib.decompress(response.content, 16 +zlib.MAX_WBITS) and finally writing the decompressed data to a .json file:

 def get_files(i):
        url = f"{url}{i}"
        elements = i.split('/') 
        name = elements[1] + '_' +elements[3] + '_' + elements[4] + '_' + elements[5]+ '_' + elements[6] + '_' +elements[7]
        try:
            response = requests.get(url=url,headers=headers,allow_redirects=True,).content
            decompressed_data = zlib.decompress(response, 16 + zlib.MAX_WBITS)
            with open(f"Streams Total Daily/{name}.json",'wb') as out:
                out.write(decompressed_data)

        except requests.exceptions.RequestException as e:
            return e
    
    def runner():
        threads =[]
        with ThreadPoolExecutor(max_workers=10) as executor:
            for i in country_files:
                threads.append(executor.submit(get_files,i))
runner()

Upvotes: 1

Related Questions