Ema Il
Ema Il

Reputation: 417

Python multithreading, thread suddenly stop

My program is supposed to get a dataset, split into chunks and do a couple of calculations per chunk.

The program runs great without any type of parallel calculations and also when I use multiprocess per chunk, and in the chunk multiprocessing per calculation.

But when I tried using multiprocessing per chunk and inside each chunk use multithreading per calculation type, I can see that sometimes not all of threads completed before the program exit. (with exit code 0)

The code is not really complicated, I do use aws clients to use S3 and query from Athena, but the fact that everything works great with only multiprocessing leads me to believe that its not something with the connections.

Working code snippet:

ChunkDataset(AthenaDataset):
    def _run_per_chunk(self, chunks: List[pd.DataFrame]) -> List[str]:
        created_tables_names_futures = []
        with ProcessPoolExecutor(max_workers=self.config[NUM_OF_WORKERS]) as process_executor:
            for chunk_num, chunk in enumerate(chunks):
                created_tables_names_futures.append(process_executor.submit(self.get_athena_datasets, chunk_num))

class AthenaDataset:
    def get_athena_datasets(self, chunk_num: int) -> str:
        historical_table_name = self._get_historical_table_name(chunk_num)
        self._get_datasets(historical_table_name, chunk_num)

    def _get_datasets(self, historical_table_name: str, chunk_num: int):
        with ProcessPoolExecutor(max_workers=len(self.config['data_types'])) as process_executor:
            for dataset_type in self.config[DATASET_TYPES]:
                process_executor.submit(self._get_dataset, historical_table_name, dataset_type, chunk_num)

When I switch the ProcessPoolExecutor in AthenaDataset to ThreadPoolExecutor its failing.

Does someone has any idea on why it something like that could happen? (the function _get_dataset saves the results to a csv file I don't need to get .result())

Upvotes: 0

Views: 417

Answers (1)

leangaurav
leangaurav

Reputation: 443

As per the docs main process exits only after all workers have completed.

One possible situation which might be causing this issue is, if one of the worker raises an exception, then it won't complete till end. Whether we get to know something went wrong in a worker is dependent on logging or on how one identifies the worker has completed or not.

For example if a worker downloads a file, but fails with an exception, we will just get to know that the file was not downloaded after main process exited.

This example code prints when the worker starts and exits, and one of the worker raises an exception which is seen it the:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import sleep

def funct(i):
    print(f"start {i}")
    if i == 3:
        raise Exception()
    sleep(1)
    print(f"end {i}")
    return i

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=2) as exe:
        f1 = exe.submit(funct,1)
        f2 = exe.submit(funct,2)
        f3 = exe.submit(funct,3)

Output (Notice there's no end 3 and no exception in output):

start 1
start 2
end 1
start 3
end 2

On modifying the main code like this:


if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=2) as exe:
        f1 = exe.submit(funct,1)
        f2 = exe.submit(funct,2)
        f3 = exe.submit(funct,3)

        for task in [f1,f2,f3]:
            try:
                print("Result of task: ", task.result())
            except Exception as err:
                print("Worker raised exception", err)

We get this output:

start 1
start 2
end 1
end 2
start 3
Result of task:  1
Result of task:  2
Worker raised exception

We clearly see there's an error.

Since the given example seems to download a file, and whether the process completed or not is dependent on if the file exists after completing execution, it's not possible to know if there was an exception if the _get_dataset() method doesn't have any output log.

So to fix this, it would be better to try to fetch the result or use some way to determine/log which all steps in the _get_dataset method were successful, as I don't see any logs in the provided code snippet.

Upvotes: 2

Related Questions