Reputation: 2849
So I am working on a little Python tool to stress test an API of application.
I've got a pretty nice script using Threading, but then I read that it will require manual coding to maintain n number of concurrent threads (meaning, starting new ones as soon as old ones finish), and the suggestion here: How to start a new thread when old one finishes? is to use ThreadPool, I tried as follows:
def test_post():
print "Executing in " + threading.currentThread().getName() + "\n"
time.sleep(randint(1, 3))
return randint(1, 5), "Message"
if args.send:
code, content = post()
print (code, "\n")
print (content)
elif args.test:
# Create new threads
print threads
results_list = []
pool = ThreadPool(processes=threads)
results = pool.apply_async(test_post())
pool.close() # Done adding tasks.
pool.join() # Wait for all tasks to complete.
# results = list(pool.imap_unordered(
# test_post(), ()
# ))
# thread_list = []
# while threading.activeCount() <= threads:
# thread = LoadTesting(threadID=free_threads, name="Thread-" + str(threading.activeCount()), counter=1)
# thread.start()
# thread_list.append(thread)
print "Exiting Main Thread" + "\n"
else:
print ("cant get here!")
When I invoke the script, I get consistent output such as:
4
Executing in MainThread
Exiting Main Thread
I am not sure why.. as you see in commented out block I tried different ways and it still does it only once.
My goal is to make the script run in loop, always running n threads at any time.
the test_post
(and respectively, post
) functions return the HTTP response code, and the content - I would like to later use this to print/stop when response code is NOT 200 OK
.
Upvotes: 1
Views: 1049
Reputation: 21694
Your first problem is that you already called your function in the MainThread
with calling:
pool.apply_async(test_post())
...instead of passing test_post
as an argument for a call to be executed in a worker-thread with:
pool.apply_async(test_post)
OP: I've got a pretty nice script using Threading, but then I read that it will require manual coding to maintain n number of concurrent threads (meaning, starting new ones as soon as old ones finish) ...
You need to distinguish between the unit of work (job, task) and a thread. The whole point of using a pool in the first place is re-using the executors, be it threads or processes. The workers are already created when a Pool is instantiated and as long as you don't close the Pool, all initial threads stay alive. So you don't care about recreating threads, you just call pool-methods of an existing pool as often as you have some work you want to distribute. Pool takes this jobs (a pool-method call) and creates tasks out of it. These tasks are put on an unbounded queue. Whenever a workers is finished with a task, it will blockingly try to get()
a new task from such an inqueue
.
OP: Pool only executes a single thread instead of 4...I tried different ways and it still does it only once.
pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
...is a single-call, single task producing job. In case you want more than one execution of func
, you either have to call pool.apply_async()
multiple times, or you use a mapping pool-method like
pool.map(func, iterable, chunksize=None)
..., which maps one function over an iterable. pool.apply_async
is non-blocking, that is why it is "async". It immediately returns an AsyncResult
-object you can (blockingly) call .wait()
or .get()
upon.
Through the comments it became clear, that you want endless and immediate replacements for finished tasks (self produced input-stream)...and the program should stop on KeyboardInterrupt or when a result does not have a certain value.
You can use the callback
-parameter of apply_async
to schedule new tasks as soon any of the old ones is finished. The difficulty lies in what to do meanwhile with the MainThread to prevent the whole script from ending prematurely while keeping it responsive for KeyboardInterrupt. Letting the MainThread sleep in a loop lets it still immediately react upon KeyboardInterrupt while preventing early exit. In case a result should stop the program, you can let the callback terminate the pool. The MainThread then just has to include a check of the pool-status in his sleep-loop.
import time
from random import randint, choice
from itertools import count
from datetime import datetime
from threading import current_thread
from multiprocessing.pool import ThreadPool
def test_post(post_id):
time.sleep(randint(1, 3))
status_code = choice([200] * 9 + [404])
return "{} {} Message no.{}: {}".format(
datetime.now(), current_thread().name, post_id, status_code
), status_code
def handle_result(result):
msg, code = result
print(msg)
if code != 200:
print("terminating")
pool.terminate()
else:
pool.apply_async(
test_post, args=(next(post_cnt),), callback=handle_result
)
if __name__ == '__main__':
N_WORKERS = 4
post_cnt = count()
pool = ThreadPool(N_WORKERS)
# initial distribution
for _ in range(N_WORKERS):
pool.apply_async(
test_post, args=(next(post_cnt),), callback=handle_result
)
try:
while pool._state == 0: # check if pool is still alive
time.sleep(1)
except KeyboardInterrupt:
print(" got interrupt")
Example Output with KeyboardInterrupt:
$> python2 scratch.py
2019-02-15 18:46:11.724203 Thread-4 Message no.3: 200
2019-02-15 18:46:12.724713 Thread-2 Message no.1: 200
2019-02-15 18:46:13.726107 Thread-1 Message no.0: 200
2019-02-15 18:46:13.726292 Thread-3 Message no.2: 200
2019-02-15 18:46:14.724537 Thread-4 Message no.4: 200
2019-02-15 18:46:14.726881 Thread-2 Message no.5: 200
2019-02-15 18:46:14.727071 Thread-1 Message no.6: 200
^C got interrupt
Example Output with termination due to unwanted return value:
$> python2 scratch.py
2019-02-15 18:44:19.966387 Thread-3 Message no.0: 200
2019-02-15 18:44:19.966491 Thread-4 Message no.1: 200
2019-02-15 18:44:19.966582 Thread-1 Message no.3: 200
2019-02-15 18:44:20.967555 Thread-2 Message no.2: 200
2019-02-15 18:44:20.968562 Thread-3 Message no.4: 404
terminating
Note, in your scenario you can also call apply_async
more often than N_WORKERS
-times for your initial distribution to have some buffer for reduced latency.
Upvotes: 1