Reputation: 3931
I'm processing Jira changelog history data, and due to the large amount of data, and the fact that most of the processing time is I/O based, I figured that an asynchronous approach might work well.
I have a list of all issue_id
's, which I'm feeding into a function that makes a request through the jira-python
api, extracts the information into a dict
, and then writes it out through a passed in DictWriter
. To make it threadsafe I imported a Lock()
from the threading
module, which I am also passing in. On testing, it seems to get deadlocked at a certain point and just hangs. I noticed in the documentation where it said that if tasks are reliant on one another then they can hang, and I suppose they are due to the lock I'm implementing. How can I prevent this from happening?
Here is my code for reference:
(At this point in the code there is a list called keys
with all the issue_id's)
def write_issue_history(
jira_instance: JIRA,
issue_id: str,
writer: DictWriter,
lock: Lock):
logging.debug('Now processing data for issue {}'.format(issue_id))
changelog = jira_instance.issue(issue_id, expand='changelog').changelog
for history in changelog.histories:
created = history.created
for item in history.items:
to_write = dict(issue_id=issue_id)
to_write['date'] = created
to_write['field'] = item.field
to_write['changed_from'] = item.fromString
to_write['changed_to'] = item.toString
clean_data(to_write)
add_etl_fields(to_write)
print(to_write)
with lock:
print('Lock obtained')
writer.writerow(to_write)
if __name__ == '__main__':
with open('outfile.txt', 'w') as outf:
writer = DictWriter(
f=outf,
fieldnames=fieldnames,
delimiter='|',
extrasaction='ignore'
)
writer_lock = Lock()
with ThreadPoolExecutor(max_workers=5) as exec:
for key in keys[:5]:
exec.submit(
write_issue_history,
j,
key,
writer,
writer_lock
)
EDIT: It's also very possible I'm being throttled by the Jira API.
Upvotes: 1
Views: 3399
Reputation: 5729
You need to store the result of exec
into a list, conventionally named futs
, and then loop through that list calling result()
to get their result, handling any errors that might have happened.
(I'd also chance exec
to executor
as that's more conventional and it avoids overriding the built-in)
from traceback import print_exc
...
with ThreadPoolExecutor(max_workers=5) as executor:
futs = []
for key in keys[:5]:
futs.append( executor.submit(
write_issue_history,
j,
key,
writer,
writer_lock)
)
for fut in futs:
try:
fut.result()
except Exception as e:
print_exc()
Upvotes: 1