Reputation: 81
I am trying to run a program using gcsfs.GCSFileSystem to access Google Cloud Storage, all that using python's concurrent.futures.ProcessPoolExecutor.
The code to run is actually very complex, but I managed to boil it down to this minimum non-working example :
from concurrent.futures import ProcessPoolExecutor
from gcsfs import GCSFileSystem
def f(path):
print(f"Creating {path}...")
print("Created. Getting glob...")
print(main_fs.glob(path))
print("Done!")
if __name__ == "__main__":
main_fs = GCSFileSystem()
print(main_fs.glob("code_tests_sand"))
with ProcessPoolExecutor(max_workers=10) as pool:
l_ = []
for sub_rules_list in (pool.map(f, ["code_tests_sand"])):
l_.append(0)
I expect :
['code_tests_sand']
Creating code_tests_sand...
Created. Getting glob...
['code_tests_sand']
Done!
I get:
['code_tests_sand']
Creating code_tests_sand...
Created. Getting glob...
With the program getting stuck here without ending.
I found a way to get the expected output by giving explicitly the GCSFileSystem object to the function :
from concurrent.futures import ProcessPoolExecutor
from gcsfs import GCSFileSystem
def f(path, ff):
print(f"Creating {path}...")
print("Created. Getting glob...")
print(ff.glob(path))
print("Done!")
if __name__ == "__main__":
main_fs = GCSFileSystem()
print(main_fs.glob("code_tests_sand"))
with ProcessPoolExecutor(max_workers=10) as pool:
l_ = []
for sub_rules_list in (pool.map(f, ["code_tests_sand"], [main_fs])):
l_.append(0)
However, it is not a good solution for me for I will not be able to do that in my real code. Any idea about WHY this is happening and how I could go around it ?
FYI I run on Ubuntu 18, Python 3.8 and here is my pip freeze output :
aiohttp==3.7.3
async-timeout==3.0.1
attrs==20.3.0
cachetools==4.2.1
certifi==2020.12.5
chardet==3.0.4
decorator==4.4.2
fsspec==0.8.5
gcsfs==0.7.2
google-auth==1.27.0
google-auth-oauthlib==0.4.2
idna==2.10
multidict==5.1.0
oauthlib==3.1.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
requests==2.25.1
requests-oauthlib==1.3.0
rsa==4.7.1
six==1.15.0
typing-extensions==3.7.4.3
urllib3==1.26.3
yarl==1.6.3
Upvotes: 2
Views: 1808
Reputation: 11
This issue is related Linux default settings. Multiprocessing worked as expected on my Mac but when deployed on a Linux VM it was unable to even initialize gcsfs. The following article pointed me to the issue.
https://britishgeologicalsurvey.github.io/science/python-forking-vs-spawn/
The simplest solution I found was to simply set
import multiprocessing as mp
...
...
...
if __name__=='__main__':
mp.set_start_method('forkserver')
Upvotes: 1
Reputation: 81
I have found a way around eventually : in the form of a class wrapped around GCSFileSystem :
from concurrent.futures import ProcessPoolExecutor
from gcsfs import GCSFileSystem
from copy import copy
import sys
class Dummy:
fs = None
@classmethod
def set_fs(cls, fs):
cls.fs = fs
def __init__(self, path):
self.fs = copy(Dummy.fs)
self.path = path
def glob(self):
return self.fs.glob(self.path)
def f(path):
print(f"Creating {path}...")
p = Dummy(path)
print("Created. Getting glob...")
print(p.glob())
print(sys.getsizeof(p.fs))
print("Done!")
if __name__ == "__main__":
main_fs = GCSFileSystem()
print(main_fs.glob("code_tests_sand"))
Dummy.set_fs(main_fs)
with ProcessPoolExecutor(max_workers=10) as pool:
l_ = []
for sub_rules_list in (pool.map(f, ["code_tests_sand"])):
l_.append(0)
Note that it is required to copy the GCSFileSystem object in each class instantiation. If not, the class will work fine as long as it is not multipcocessed, but will display the same problematic behavior when it is. The GCSFileSystem here only weights around 50 bytes so copying it should not impact the memory too much.
Upvotes: 2