Reputation: 97
For some reason, at the start of the script's execution (macOS), I have 8 processes in multiprocessing Pool spawned and working, but in several minutes after the start, there is only 1 process working left.
I have this code (it's way bigger than that, but it'll explain the picture):
def GetStatesDataset(dataset):
df_states = pd.read_csv(dataset)
return df_states
def UploadDataFrameToBigQuery(table_name, prefix_name, project_id, if_exists, df):
table_name = table_name + prefix_name
pd.DataFrame.to_gbq(df,
table_name,
project_id=project_id,
if_exists=if_exists)
def InitGetDataFromGCP(data, prefix):
client = storage.Client()
files = []
blobs = client.list_blobs(data, prefix=prefix)
for blob in blobs:
files.append(f'{data}/{blob.name}')
return files
def GetDataFromGCP(file):
fs = gcsfs.GCSFileSystem() # GCP's Google Cloud Storage (GCS) File System (FS)
with fs.open(file, 'r') as f:
# Reading json into Pandas DataFrame
gcs_data = [json.loads(line) for line in f]
data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
df = pd.DataFrame(data)
df = pd.merge_asof(df,
df_states,
left_on="start_time",
right_on="state_reached_at",
by="car_id",
direction="backward")
UploadDataFrameToBigQuery(table_name, prefix_name, project_id, if_exists, df)
logging.info(str(multiprocessing.current_process()) + 'Finished: execution time: ' + str(exec_time))
#######################
df_states = GetStatesDataset('gs://link-to-my.csv')
dataset_name = 'one'
prefix_name = 'two'
# config for uploading data to BigQuery
table_name = 'one-two.'
project_id = 'one-two-three'
if_exists = 'append'
def main():
files = InitGetDataFromGCP(dataset_name, prefix_name)
with multiprocessing.Pool(processes=8) as pool:
pool.map(GetDataFromGCP, files)
if __name__ == '__main__':
main()
Since I'm logging everything, I can see all the processes at the start (everything's working fine):
2020-08-29 15:55:13,957 <SpawnProcess name='SpawnPoolWorker-8' parent=1420 started daemon>Finished: execution time: 22.53874
2020-08-29 15:55:15,947 <SpawnProcess name='SpawnPoolWorker-7' parent=1420 started daemon>Finished: execution time: 23.259828000000002
2020-08-29 15:55:17,219 <SpawnProcess name='SpawnPoolWorker-3' parent=1420 started daemon>Finished: execution time: 8.758934000000004
2020-08-29 15:55:19,094 <SpawnProcess name='SpawnPoolWorker-6' parent=1420 started daemon>Finished: execution time: 7.409976
2020-08-29 15:55:21,755 <SpawnProcess name='SpawnPoolWorker-6' parent=1420 started daemon>Finished: execution time: 0.25443099999999674
but after some time I get this:
2020-08-29 16:24:28,494 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 10.398635000000013
2020-08-29 16:24:36,077 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 4.782628999999929
2020-08-29 16:24:40,220 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 1.1638890000000401
2020-08-29 16:24:44,032 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 1.519871999999964
2020-08-29 16:24:50,449 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 3.1979730000000473
I also can confirm that there is only 1 process working by looking at my CPU activity. There are 8 Python processes spawned, but only 1 is close to 100%. I am new to multiprocessing, maybe I don't know what I'm doing, but I want all the 8 workers to perform tasks until my "files" end.
Upvotes: 0
Views: 601
Reputation: 97
It was so obvious. I just needed to specify a chunksize. Since I have almost 17000 files that I need to process, one file at a time, chunksize=1 seems to work like a charm:
with multiprocessing.Pool(processes=8) as pool:
result = pool.map(GetDataFromGCP, files, chunksize=1)
Upvotes: 0