pcotte
pcotte

Reputation: 81

Use GCSFileSystem with MultiProcessing

I am trying to run a program using gcsfs.GCSFileSystem to access Google Cloud Storage, all that using python's concurrent.futures.ProcessPoolExecutor.

The code to run is actually very complex, but I managed to boil it down to this minimum non-working example :

from concurrent.futures import ProcessPoolExecutor
from gcsfs import GCSFileSystem


def f(path):
    print(f"Creating {path}...")
    print("Created. Getting glob...")
    print(main_fs.glob(path))
    print("Done!")


if __name__ == "__main__":

    main_fs = GCSFileSystem()
    print(main_fs.glob("code_tests_sand"))

    with ProcessPoolExecutor(max_workers=10) as pool:
        l_ = []
        for sub_rules_list in (pool.map(f, ["code_tests_sand"])):
            l_.append(0)

I expect :

['code_tests_sand']
Creating code_tests_sand...
Created. Getting glob...
['code_tests_sand']
Done!

I get:

['code_tests_sand']
Creating code_tests_sand...
Created. Getting glob...

With the program getting stuck here without ending.

I found a way to get the expected output by giving explicitly the GCSFileSystem object to the function :

from concurrent.futures import ProcessPoolExecutor
from gcsfs import GCSFileSystem


def f(path, ff):
    print(f"Creating {path}...")
    print("Created. Getting glob...")
    print(ff.glob(path))
    print("Done!")


if __name__ == "__main__":

    main_fs = GCSFileSystem()
    print(main_fs.glob("code_tests_sand"))

    with ProcessPoolExecutor(max_workers=10) as pool:
        l_ = []
        for sub_rules_list in (pool.map(f, ["code_tests_sand"], [main_fs])):
            l_.append(0)

However, it is not a good solution for me for I will not be able to do that in my real code. Any idea about WHY this is happening and how I could go around it ?

FYI I run on Ubuntu 18, Python 3.8 and here is my pip freeze output :

aiohttp==3.7.3
async-timeout==3.0.1
attrs==20.3.0
cachetools==4.2.1
certifi==2020.12.5
chardet==3.0.4
decorator==4.4.2
fsspec==0.8.5
gcsfs==0.7.2
google-auth==1.27.0
google-auth-oauthlib==0.4.2
idna==2.10
multidict==5.1.0
oauthlib==3.1.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
requests==2.25.1
requests-oauthlib==1.3.0
rsa==4.7.1
six==1.15.0
typing-extensions==3.7.4.3
urllib3==1.26.3
yarl==1.6.3

Upvotes: 2

Views: 1808

Answers (2)

Ian Wilson
Ian Wilson

Reputation: 11

This issue is related Linux default settings. Multiprocessing worked as expected on my Mac but when deployed on a Linux VM it was unable to even initialize gcsfs. The following article pointed me to the issue.

https://britishgeologicalsurvey.github.io/science/python-forking-vs-spawn/

The simplest solution I found was to simply set

import multiprocessing as mp
...
...
...
if __name__=='__main__':
    mp.set_start_method('forkserver')

Upvotes: 1

pcotte
pcotte

Reputation: 81

I have found a way around eventually : in the form of a class wrapped around GCSFileSystem :

from concurrent.futures import ProcessPoolExecutor
from gcsfs import GCSFileSystem
from copy import copy
import sys


class Dummy:
    fs = None

    @classmethod
    def set_fs(cls, fs):
        cls.fs = fs

    def __init__(self, path):
        self.fs = copy(Dummy.fs)
        self.path = path

    def glob(self):
        return self.fs.glob(self.path)


def f(path):
    print(f"Creating {path}...")
    p = Dummy(path)
    print("Created. Getting glob...")
    print(p.glob())
    print(sys.getsizeof(p.fs))
    print("Done!")


if __name__ == "__main__":

    main_fs = GCSFileSystem()
    print(main_fs.glob("code_tests_sand"))

    Dummy.set_fs(main_fs)

    with ProcessPoolExecutor(max_workers=10) as pool:
        l_ = []
        for sub_rules_list in (pool.map(f, ["code_tests_sand"])):
            l_.append(0)

Note that it is required to copy the GCSFileSystem object in each class instantiation. If not, the class will work fine as long as it is not multipcocessed, but will display the same problematic behavior when it is. The GCSFileSystem here only weights around 50 bytes so copying it should not impact the memory too much.

Upvotes: 2

Related Questions