zot8
zot8

Reputation: 97

Why would processes in multiprocessing.Pool stop working / spawning?

For some reason, at the start of the script's execution (macOS), I have 8 processes in multiprocessing Pool spawned and working, but in several minutes after the start, there is only 1 process working left.

I have this code (it's way bigger than that, but it'll explain the picture):

def GetStatesDataset(dataset):
    df_states = pd.read_csv(dataset)
    return df_states

def UploadDataFrameToBigQuery(table_name, prefix_name, project_id, if_exists, df):
    table_name = table_name + prefix_name
    pd.DataFrame.to_gbq(df,
                        table_name,
                        project_id=project_id,
                        if_exists=if_exists)

def InitGetDataFromGCP(data, prefix):
    client = storage.Client()
    files = []
    blobs = client.list_blobs(data, prefix=prefix)
    for blob in blobs:
        files.append(f'{data}/{blob.name}')
    return files

def GetDataFromGCP(file):
    fs = gcsfs.GCSFileSystem() # GCP's Google Cloud Storage (GCS) File System (FS)
    with fs.open(file, 'r') as f:
        # Reading json into Pandas DataFrame
        gcs_data = [json.loads(line) for line in f]
        data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
        df = pd.DataFrame(data)
    
    df = pd.merge_asof(df,
                      df_states,
                      left_on="start_time",
                      right_on="state_reached_at",
                      by="car_id",
                      direction="backward")
    UploadDataFrameToBigQuery(table_name, prefix_name, project_id, if_exists, df)
    
    logging.info(str(multiprocessing.current_process()) + 'Finished: execution time: ' + str(exec_time))

#######################

df_states = GetStatesDataset('gs://link-to-my.csv')

dataset_name = 'one'
prefix_name = 'two'

# config for uploading data to BigQuery

table_name = 'one-two.'
project_id = 'one-two-three'

if_exists = 'append'


def main():
    files = InitGetDataFromGCP(dataset_name, prefix_name)

    with multiprocessing.Pool(processes=8) as pool:
        pool.map(GetDataFromGCP, files)

if __name__ == '__main__':
    main()

Since I'm logging everything, I can see all the processes at the start (everything's working fine):

2020-08-29 15:55:13,957 <SpawnProcess name='SpawnPoolWorker-8' parent=1420 started daemon>Finished: execution time: 22.53874
2020-08-29 15:55:15,947 <SpawnProcess name='SpawnPoolWorker-7' parent=1420 started daemon>Finished: execution time: 23.259828000000002
2020-08-29 15:55:17,219 <SpawnProcess name='SpawnPoolWorker-3' parent=1420 started daemon>Finished: execution time: 8.758934000000004
2020-08-29 15:55:19,094 <SpawnProcess name='SpawnPoolWorker-6' parent=1420 started daemon>Finished: execution time: 7.409976
2020-08-29 15:55:21,755 <SpawnProcess name='SpawnPoolWorker-6' parent=1420 started daemon>Finished: execution time: 0.25443099999999674

but after some time I get this:

2020-08-29 16:24:28,494 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 10.398635000000013
2020-08-29 16:24:36,077 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 4.782628999999929
2020-08-29 16:24:40,220 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 1.1638890000000401
2020-08-29 16:24:44,032 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 1.519871999999964
2020-08-29 16:24:50,449 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 3.1979730000000473

I also can confirm that there is only 1 process working by looking at my CPU activity. There are 8 Python processes spawned, but only 1 is close to 100%. I am new to multiprocessing, maybe I don't know what I'm doing, but I want all the 8 workers to perform tasks until my "files" end.

Upvotes: 0

Views: 601

Answers (1)

zot8
zot8

Reputation: 97

It was so obvious. I just needed to specify a chunksize. Since I have almost 17000 files that I need to process, one file at a time, chunksize=1 seems to work like a charm:

with multiprocessing.Pool(processes=8) as pool:
        result = pool.map(GetDataFromGCP, files, chunksize=1)

Upvotes: 0

Related Questions