Harry de winton
Harry de winton

Reputation: 1069

multiprocessing - processes won't join?

TL;DR - the consumer processes finish but do not join, no errors are raised and the script runs infinitely, stuck in limbo on the join statment?

I am aiming to speed up a data retrieval process, however I do not know how many 'tasks' (pieces of data to retrieve) there might be. So I made a modified version of the poison pill method so that the task recognizes when it is no longer retrieving information, and triggers the poison pill if statement.

I have posted a proof, which is a working example of my poison pill method, and a full script, which as the name implies is the full script. (both should be able to run as is)

proof:

import multiprocessing


class Task:
    def __init__(self, number):
        self.number = number

    def __call__(self):
        """Find officer and company data and combine and save it"""

        try:
            # 'gather some data!'
            self.result = self.number*2
            print(self.number)
            # 'fake' finding no data
            if self.result >= 8:
                raise NameError
        except NameError:
            # become poison pill once latest is done
            self.result = None

    def output(self):
        return self.result


class Consumer(multiprocessing.Process):
    """Handle process and re-queue complete tasks"""
    def __init__(self, waiting_queue, complete_queue):
        multiprocessing.Process.__init__(self)
        self.waiting_queue = waiting_queue
        self.complete_queue = complete_queue

    def run(self):
        """process tasks until queue is empty"""
        proc_name = self.name
        while True:
            current_task = self.waiting_queue.get()
            current_task()
            if current_task.output() is None:
                print('{}: Exiting, poison pill reached'.format(proc_name))
                self.waiting_queue.task_done()
                break
            self.waiting_queue.task_done()
            self.complete_queue.put(current_task)
        print('{}: complete'.format(proc_name))


class Shepard:
    """Handle life cycle of Consumers, Queues and Tasks"""
    def __init__(self):
        pass

    def __call__(self, start_point):

        # initialize queues
        todo = multiprocessing.JoinableQueue()
        finished = multiprocessing.JoinableQueue()

        # start consumers
        num_consumers = multiprocessing.cpu_count() * 2
        consumers = [Consumer(todo, finished) for i in range(num_consumers)]
        for q in consumers:
            q.start()

        # decide on (max) end limit (make much longer than suspected amount of data to be gathered
        start = int(start_point)
        max_record_range = 100
        end = start + max_record_range

        # Enqueue jobs
        for i in range(start, end):
            todo.put(Task(i))
        print('Processes joining')
        # wait for processes to join
        for p in consumers:
            p.join()
        print('Processes joined')

        # process results - UNFINISHED
        pass

        # return results - UNFINISHED
        return 'results!'


if __name__ == '__main__':

    # load start points:
    start_points = {'cat1': 1, 'cat2': 3, 'cat3': 4}


    master = Shepard()
    cat1 = master(start_points['cat1'])
    print('cat1 done')
    cat2 = master(start_points['cat2'])
    print('cat2 done')
    cat3 = master(start_points['cat3'])

So here is the full script:

import time
import requests
import sys
import json
import pandas as pd
import multiprocessing
import queue


class CompaniesHouseRequest:
    """Retreive information from Companies House"""
    def __init__(self, company, catagory_url=''):
        """Example URL: '/officers'"""
        self.company = str(company)
        self.catagory_url = str(catagory_url)

    def retrieve(self, key='Rn7RLDV9Tw9v4ShDCotjDtJFBgp1Lr4d-9GRYZMo'):
        """retrieve data from Companies House"""
        call = 'https://api.companieshouse.gov.uk/company/' + self.company + self.catagory_url
        retrieve_complete = False
        while retrieve_complete is False:
            resp = requests.get(call, auth=requests.auth.HTTPBasicAuth(key, ''))
            code = resp.status_code
            if code == 404:
                print(resp.status_code)
                raise NameError('Company not found')
            elif code == 200:
                try:
                    self.data = json.loads(resp.content.decode('UTF8'))
                    retrieve_complete = True
                except json.decoder.JSONDecodeError:
                    print('Decode Error in Officers!')
            else:
                print("Error:", sys.exc_info()[0])
                print('Retrying')
                time.sleep(5)
        return self.data


class Company:
    """Retrieve and hold company details"""
    def __init__(self, company_number):
        self.company_number = company_number

    def __call__(self):
        """Create request and process data"""
        # make request
        req = CompaniesHouseRequest(self.company_number)
        data = req.retrieve()
        # extract data
        try:
            line = [self.company_number,
                    data['company_name'],
                    data['registered_office_address'].get('premises', ''),
                    data['registered_office_address'].get('address_line_1', ''),
                    data['registered_office_address'].get('address_line_2', ''),
                    data['registered_office_address'].get('country', ''),
                    data['registered_office_address'].get('locality', ''),
                    data['registered_office_address'].get('postal_code', ''),
                    data['registered_office_address'].get('region', '')]
        except KeyError:
            line = ['' for i in range(0, 9)]
        # save as pandas dataframe
        return pd.DataFrame([line], columns=['company_number', 'company_name', 'company_address_premises',
                                             'company_address_line_1', 'company_address_line_2',
                                             'company_address_country', 'company_address_locality',
                                             'company_address_postcode', 'company_address_region'])


def name_splitter(name):
    split = name.split(', ')
    if len(split) > 2:
        return [split[2], split[1], split[0]]
    else:
        return ['', split[1], split[0]]


class Officers:
    """Retrieve and hold officers details"""
    def __init__(self, company_number):
        self.company_number = company_number

    def __call__(self):
        """Create request and process data"""
        # make request
        req = CompaniesHouseRequest(self.company_number, '/officers')
        data = req.retrieve()
        # extract data
        for officer in data['items']:
            if officer['officer_role'] == 'director':
                name = name_splitter(officer['name'])
                line = [name[0],
                        name[1],
                        name[2],
                        officer.get('occupation'),
                        officer.get('country_of_residence'),
                        officer.get('nationality'),
                        officer.get('appointed_on', ''),
                        officer['address'].get('premises', ''),
                        officer['address'].get('address_line_1', ''),
                        officer['address'].get('address_line_2', ''),
                        officer['address'].get('country', ''),
                        officer['address'].get('locality', ''),
                        officer['address'].get('postal_code', ''),
                        officer['address'].get('region', '')]
                break
        director_count = sum(map(lambda x: x['officer_role'] == 'director', data['items']))
        if director_count > 1:
            line += [True]
        elif director_count == 1:
            line += [False]
        else:
            line = ['no directors'] * 3 + [''] * 12
        return pd.DataFrame([line], columns=['title', 'first_name', 'surname', 'occupation', 'country_of_residence',
                                             'nationality', 'appointed_on',
                                             'address_premises', 'address_line_1', 'address_line_2',
                                             'address_country', 'address_locality', 'address_postcode',
                                             'address_region', 'multi_director'])


class Task:
    def __init__(self, prefix, company_number):
        self.prefix = prefix
        self.company_number = company_number

    def __call__(self):
        """Find officer and company data and combine and save it"""
        comp_id = self.prefix + str(self.company_number)
        print(comp_id)
        try:
            # initialise company class
            comp = Company(comp_id)
            # initialise officer class
            off = Officers(comp_id)
            # retrieve and concatonate
            self.result = pd.concat([comp(), off()], axis=1)

        except NameError:
            # become poison pill once latest is done
            self.result = None

    def output(self):
        return self.result


class Consumer(multiprocessing.Process):
    """Handle process and re-queue complete tasks"""
    def __init__(self, waiting_queue, complete_queue):
        multiprocessing.Process.__init__(self)
        self.waiting_queue = waiting_queue
        self.complete_queue = complete_queue

    def run(self):
        """process tasks until queue is empty"""
        proc_name = self.name
        while True:
            current_task = self.waiting_queue.get()
            current_task()
            if current_task.output() is None:
                print('{}: Exiting, poison pill reached'.format(proc_name))
                self.waiting_queue.task_done()
                break
            self.waiting_queue.task_done()
            self.complete_queue.put(current_task)
        print('{}: complete'.format(proc_name))


class Shepard:
    """Handle life of Consumers, Queues and Tasks"""
    def __init__(self):
        pass

    def __call__(self, prefix, start_point):

        # initialize queues
        todo = multiprocessing.JoinableQueue()
        finished = multiprocessing.JoinableQueue()

        # start consumers
        num_consumers = multiprocessing.cpu_count() * 2
        consumers = [Consumer(todo, finished) for i in range(num_consumers)]
        for q in consumers:
            q.start()

        # decide on (max) end limit
        start = int(start_point)
        max_record_range = 1000
        end = start + max_record_range

        # Enqueue jobs
        for i in range(start, end):
            todo.put(Task(prefix, i))
        print('Processes joining')

        # wait for processes to join
        for p in consumers:
            p.join()
        print('Processes joined')

        # process results - UNFINISHED
        pass

        # return results - UNFINISHED
        return 'results!'


if __name__ == '__main__':
    # paths to data
    data_directory = r'C:\Users\hdewinton\OneDrive - Advanced Payment Solutions\Python\Corporate DM\data'
    base = r'\base'

    # load start points:
    init = {"England": 10926071, "Scotland": 574309, "Ireland": 647561}

    # gather data for each catagory
    master = Shepard()
    ireland = master('NI', init['Ireland'])
    scotland = master('SC', init['Scotland'])
    england = master('', init['England'])

Upvotes: 2

Views: 5009

Answers (1)

Harry de winton
Harry de winton

Reputation: 1069

TL;DR - the consequence (getting stuck in limbo while the consumers fail to join) can be fixed by changing this:

finished = multiprocessing.JoinableQueue()

to this:

mananger = multiprocessing.Manager()
finished = mananger.Queue()

Details - "When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager." from the documentation

The second queue, of finished items, triggers one of the aforementioned surprising consquences if a certain number of tasks are added to it. Below the limit there are no problems and above the limit the consequence occurs. This does not occur in the dummy because the second queue, while present, is not used. The limit depends on the size and complexity of the Task objects, so I recon this has something to do with the flushing of pickled data only occurring after a certain volume of data is reached - the volume of data triggers this consequence

Addendum - Another error also appears once the fix has been implemented: a pipe error occurs as the consumers of the todo queue are terminated before the queue is empty leaving the pipe within the queue object with no connection object to send data to. This triggers a WinError 232. Not to worry though, the pipe error can be fixed by emptying the queue before exiting the consumers. Simply add this to the consumers class run method:

while not self.waiting_queue.empty():
            try:
                self.waiting_queue.get(timeout=0.001)
            except:
                pass
        self.waiting_queue.close()

this removes every element from the queue, make sure its after the main while loop and the pipe error should not occur because the consumers will empty the will queue before terminating.

Upvotes: 6

Related Questions