Reputation: 372
Is it possible to iterate the generator
object in Python with asyncio
? I made a simple function named hash_generator()
which return a unique hash. Now I decided to benchmark the loop and I get around 8 seconds for iterating to print 100,000 hashes. Can I run this in async to be able to minimize the time? I read the documentation of it but I am confused. I want to explore async and I want to begin with this problem.
import hashlib
import string
import random
import time
def hash_generator():
"""Return a unique hash"""
prefix = int(time.time())
suffix = (random.choice(string.ascii_letters) for i in range(10))
key = ".".join([str(prefix), str("".join(suffix))])
value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
return value.upper()
"""Iterating the hashes and printing the time it loaded"""
hashes = (hash_generator() for i in range(100000))
time_before = time.time()
[print(i) for i in hashes]
time_after = time.time()
difference = time_after - time_before
print('Loaded in {0:.2f}sec'.format(difference))
# 40503CBA2DAE
# ...
# A511068F4945
# Loaded in 8.81sec
The random.choice()
function is the main reason why the program was taking too long to run. I recreated the function below, with current time and random string from os.urandom
(low collision) as values. I tried multithreading but instead of making the task to run as fast it's taking too slow. Any recommendation to refactor the code below is always welcomed.
import hashlib
import time
import os
import timeit
def hash_generator():
"""Return a unique hash"""
prefix = str(time.time())
suffix = str(os.urandom(10))
key = "".join([prefix, suffix])
value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
return value.upper()
"""Iterating the hashes and printing the time it loaded"""
print(timeit.timeit(hash_generator, number=100000), "sec")
# 0.497149389999322 sec
With the help of Jack Taylor and Stackoverflowers I can see the difference by using multiprocessing
over 1M iterations. I benchmark the code below.
import hashlib
import time
import os
import timeit
import multiprocessing
def hash_generator(_=None):
"""Return a unique hash"""
prefix = str(time.time())
suffix = str(os.urandom(10))
key = "".join([prefix, suffix])
value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
return value.upper()
# Allows for the safe importing of the main module
if __name__ == "__main__":
start_time = time.time()
number_processes = 4
iteration = 10000000
pool = multiprocessing.Pool(number_processes)
results = pool.map(hash_generator, range(iteration))
pool.close()
pool.join()
end_time = time.time()
pool_runtime = end_time - start_time
print('(Pool) Loaded in: {0:.5f} sec'.format(pool_runtime))
ordinary_runtime = timeit.timeit(hash_generator, number=iteration)
print('(Ordinary) Loaded in: {0:.5f} sec'.format(ordinary_runtime))
iteration = 10
(Pool) Loaded in: 1.20685 sec
(Ordinary) Loaded in: 0.00023 sec
iteration = 1000
(Pool) Loaded in: 0.72233 sec
(Ordinary) Loaded in: 0.01767 sec
iteration = 1000
(Pool) Loaded in: 0.99571 sec
(Ordinary) Loaded in: 0.01208 sec
iteration = 10,000
(Pool) Loaded in: 1.07876 sec
(Ordinary) Loaded in: 0.12652 sec
iteration = 100,000
(Pool) Loaded in: 1.57068 sec
(Ordinary) Loaded in: 1.23418 sec
iteration = 1,000,000
(Pool) Loaded in: 4.28724 sec
(Ordinary) Loaded in: 11.56332 sec
iteration = 10,000,000
(Pool) Loaded in: 27.26819 sec
(Ordinary) Loaded in: 132.68170 sec
Upvotes: 1
Views: 287
Reputation: 6207
It looks like you are probably better off with the sequential version. The conventional wisdom is that, in Python, with I/O-bound jobs (file reads/writes, networking) you can get a speed-up by using an event loop or multiple threads, and with CPU-bound jobs (like computing hashes) you can get a speed-up by using multiple processes.
However, I took your version and rewrote it using concurrent.futures
and a process pool, and instead of speeding it up it made it 10 times slower.
Here's the code:
from concurrent import futures
import hashlib
import string
import random
import time
def hash_generator():
"""Return a unique hash"""
prefix = int(time.time())
suffix = (random.choice(string.ascii_letters) for i in range(10))
key = ".".join([str(prefix), str("".join(suffix))])
value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
return value.upper()
def main(workers = None):
"""Iterating the hashes and printing the time it loaded"""
time_before = time.time()
with futures.ProcessPoolExecutor(workers) as executor:
worker_count = executor._max_workers
jobs = (executor.submit(hash_generator) for i in range(100000))
for future in futures.as_completed(jobs):
print(future.result())
time_after = time.time()
difference = time_after - time_before
print('Loaded in {0:.2f}sec with {1} workers'.format(difference, worker_count))
if __name__ == '__main__':
main()
# 2BD6056CC0B4
# ...
# D0A6707225EB
# Loaded in 50.74sec with 4 workers
With multiple processes there is some overhead involved with starting and stopping the different processes, and with inter-process communication, which is probably why the multi-process version is slower than the sequential version even though it is using all of the CPU cores.
You could also try using clustering to split the work over multiple computers, and/or writing the algorithm in a lower-level language (Go strikes me as a good choice). But whether that would be worth your while, I don't know.
Upvotes: 1