flybonzai
flybonzai

Reputation: 3931

How to use locks without causing deadlock in concurrent.futures.ThreadPoolExecutor?

I'm processing Jira changelog history data, and due to the large amount of data, and the fact that most of the processing time is I/O based, I figured that an asynchronous approach might work well.

I have a list of all issue_id's, which I'm feeding into a function that makes a request through the jira-python api, extracts the information into a dict, and then writes it out through a passed in DictWriter. To make it threadsafe I imported a Lock() from the threading module, which I am also passing in. On testing, it seems to get deadlocked at a certain point and just hangs. I noticed in the documentation where it said that if tasks are reliant on one another then they can hang, and I suppose they are due to the lock I'm implementing. How can I prevent this from happening?

Here is my code for reference:

(At this point in the code there is a list called keys with all the issue_id's)

def write_issue_history(
        jira_instance: JIRA,
        issue_id: str,
        writer: DictWriter,
        lock: Lock):
    logging.debug('Now processing data for issue {}'.format(issue_id))
    changelog = jira_instance.issue(issue_id, expand='changelog').changelog

    for history in changelog.histories:
        created = history.created
        for item in history.items:
            to_write = dict(issue_id=issue_id)
            to_write['date'] = created
            to_write['field'] = item.field
            to_write['changed_from'] = item.fromString
            to_write['changed_to'] = item.toString
            clean_data(to_write)
            add_etl_fields(to_write)
            print(to_write)
            with lock:
                print('Lock obtained')
                writer.writerow(to_write)

if __name__ == '__main__':
    with open('outfile.txt', 'w') as outf:
                writer = DictWriter(
                    f=outf,
                    fieldnames=fieldnames,
                    delimiter='|',
                    extrasaction='ignore'
                )
                writer_lock = Lock()
                with ThreadPoolExecutor(max_workers=5) as exec:
                    for key in keys[:5]:
                        exec.submit(
                            write_issue_history,
                            j,
                            key,
                            writer,
                            writer_lock
                        )

EDIT: It's also very possible I'm being throttled by the Jira API.

Upvotes: 1

Views: 3399

Answers (1)

Julien
Julien

Reputation: 5729

You need to store the result of exec into a list, conventionally named futs, and then loop through that list calling result() to get their result, handling any errors that might have happened.

(I'd also chance exec to executor as that's more conventional and it avoids overriding the built-in)

from traceback import print_exc

...

with ThreadPoolExecutor(max_workers=5) as executor:
    futs = []
    for key in keys[:5]:
        futs.append( executor.submit(
            write_issue_history,
            j,
            key,
            writer,
            writer_lock)
        )

for fut in futs:
    try:
        fut.result()
    except Exception as e:
        print_exc()

Upvotes: 1

Related Questions