Reputation: 338
I want to add 5 for every element in range(1,100) with threading module, to watch which rusult is in which thread. I finished almost of the code,but how to pass argument into threading.Thread?
import threading,queue
x=range(1,100)
y=queue.Queue()
for i in x:
y.put(i)
def myadd(x):
print(x+5)
for i in range(5):
print(threading.Thread.getName())
threading.Thread(target=myadd,args=x).start() #it is wrong here
y.join()
Thinks to dano ,it is ok now ,in order to run in interactive way, i rewrite it as:
method 1:run in interactive way.
from concurrent.futures import ThreadPoolExecutor
import threading
x = range(1, 100)
def myadd(x):
print("Current thread: {}. Result: {}.".format(threading.current_thread(), x+5))
def run():
t = ThreadPoolExecutor(max_workers=5)
t.map(myadd, x)
t.shutdown()
run()
methdo 2:
from concurrent.futures import ThreadPoolExecutor
import threading
x = range(1, 100)
def myadd(x):
print("Current thread: {}. Result: {}.".format(threading.current_thread(), x+5))
def run():
t = ThreadPoolExecutor(max_workers=5)
t.map(myadd, x)
t.shutdown()
if __name__=="__main__":
run()
What about if more args to be passed into ThreadPoolExecutor? I want to calculate 1+3, 2+4, 3+45 until 100+102 with multi-processing module. And what about 20+1,20+2,20+3 until 20+100 with multi-processing module?
from multiprocessing.pool import ThreadPool
do = ThreadPool(5)
def myadd(x,y):
print(x+y)
do.apply(myadd,range(3,102),range(1,100))
How to fix it?
Upvotes: 1
Views: 14316
Reputation: 5569
From:
import threading,queue
x=range(1,100)
y=queue.Queue()
for i in x:
y.put(i)
def myadd(x):
print(x+5)
for i in range(5):
print(threading.Thread.getName())
threading.Thread(target=myadd,args=x).start() #it is wrong here
y.join()
To:
import threading
import queue
# So print() in various threads doesn't garble text;
# I hear it is better to use RLock() instead of Lock().
screen_lock = threading.RLock()
# I think range() is an iterator or generator. Thread safe?
argument1 = range(1, 100)
argument2 = [100,] * 100 # will add 100 to each item in argument1
# I believe this creates a tuple (immutable).
# If it were a mutable object then perhaps it wouldn't be thread safe.
data = zip(argument1, argument2)
# object where multiple threads can grab data while avoiding deadlocks.
q = queue.Queue()
# Fill the thread-safe queue with mock data
for item in data:
q.put(item)
# It could be wiser to use one queue for each inbound data stream.
# For example one queue for file reads, one queue for console input,
# one queue for each network socket. Remembering that rates of
# reading files and console input and receiving network traffic all
# differ and you don't want one I/O operation to block another.
# inbound_file_data = queue.Queue()
# inbound_console_data = queue.Queue() # etc.
# This function is a thread target
def myadd(thread_name, a_queue):
# This thread-targetted function blocks only within each thread;
# at a_queue.get() and at a_queue.put() (if queue is full).
#
# Each thread targetting this function has its own copy of
# this functions local() namespace. So each thread will
# pause when the queue is empty, on queue.get(), or when
# the queue is full, on queue.put(). With one queue, this
# means all threads will block at the same time, when the
# single queue is full or when the single queue is empty
# unless we check for the number of remaining items in the
# queue before we do a queue.get() and if none remain in the
# queue just exit this function. This presumes the data is
# not a continues and slow stream like a network connection
# or a rotating log file but limited like a closed file.
# Let each thread continue to read from the global
# queue until it is empty.
#
# This is a bad use-case for using threading.
#
# If each thread had a separate queue it would be
# a better use-case. You don't want one slow stream of
# data blocking the processing of a fast stream of data.
#
# For a single stream of data it is likely better just not
# to use threads. However here is a single "global" queue
# example...
# presumes a_queue starts off not empty
while a_queue.qsize():
arg1, arg2 = a_queue.get() # blocking call
# prevent console/tty text garble
if screen_lock.acquire():
print('{}: {}'.format(thread_name, arg1 + arg2))
print('{}: {}'.format(thread_name, arg1 + 5))
print()
screen_lock.release()
else:
# print anyway if lock fails to acquire
print('{}: {}'.format(thread_name, arg1 + arg2))
print('{}: {}'.format(thread_name, arg1 + 5))
print()
# allows .join() to keep track of when queue finished
a_queue.task_done()
# create threads and pass in thread name and queue to thread-target function
threads = []
for i in range(5):
thread_name = 'Thread-{}'.format(i)
thread = threading.Thread(
name=thread_name,
target=myadd,
args=(thread_name, q))
# Recommended:
# queues = [queue.Queue() for index in range(len(threads))] # put at top of file
# thread = threading.Thread(
# target=myadd,
# name=thread_name,
# args=(thread_name, queues[i],))
threads.append(thread)
# some applications should start threads after all threads are created.
for thread in threads:
thread.start()
# Each thread will pull items off the queue. Because the while loop in
# myadd() ends with the queue.qsize() == 0 each thread will terminate
# when there is nothing left in the queue.
Upvotes: 0
Reputation: 94871
It looks like you're trying to create a thread pool manually, so that five threads are used to add up all 100 results. If this is the case, I would recommend using multiprocessing.pool.ThreadPool
for this:
from multiprocessing.pool import ThreadPool
import threading
import queue
x = range(1, 100)
def myadd(x):
print("Current thread: {}. Result: {}.".format(
threading.current_thread(), x+5))
t = ThreadPool(5)
t.map(myadd, x)
t.close()
t.join()
If you're using Python 3.x, you could use concurrent.futures.ThreadPoolExecutor
instead:
from concurrent.futures import ThreadPoolExecutor
import threading
x = range(1, 100)
def myadd(x):
print("Current thread: {}. Result: {}.".format(threading.current_thread(), x+5))
t = ThreadPoolExecutor(max_workers=5)
t.map(myadd, x)
t.shutdown()
I think there are two issues with your original code. First, you need to pass a tuple to the args
keyword argument, not a single element:
threading.Thread(target=myadd,args=(x,))
However, you're also trying to pass the entire list (or range
object, if using Python 3.x) returned by range(1,100)
to myadd
, which isn't really what you want to do. It's also not clear what you're using the queue for. Maybe you meant to pass that to myadd
?
One final note: Python uses a Global Interpreter Lock (GIL), which prevents more than one thread from using the CPU at a time. This means that doing CPU-bound operations (like addition) in threads provides no performance boost in Python, since only one of the threads will ever run at a time. Therefore, In Python it's preferred to use the multiple processes to parallelize CPU-bound operations. You could make the above code use multiple processes by replacing the ThreadPool
in the first example with from mulitprocessing import Pool
. In the second example, you would use ProcessPoolExecutor
instead of ThreadPoolExecutor
. You would also probably want to replace threading.current_thread()
with os.getpid()
.
Edit:
Here's how to handle the case where there are two different args to pass:
from multiprocessing.pool import ThreadPool
def myadd(x,y):
print(x+y)
def do_myadd(x_and_y):
return myadd(*x_and_y)
do = ThreadPool(5)
do.map(do_myadd, zip(range(3, 102), range(1, 100)))
We use zip
to create a list where we pair together each variable in the range:
[(3, 1), (4, 2), (5, 3), ...]
We use map
to call do_myadd
with each tuple in that list, and do_myadd
uses tuple expansion (*x_and_y
), to expand the tuple into two separate arguments, which get passed to myadd
.
Upvotes: 0
Reputation: 6252
Here you need to pass a tuple rather than using a single element.
For making a tuple the code would be .
dRecieved = connFile.readline();
processThread = threading.Thread(target=processLine, args=(dRecieved,));
processThread.start();
Please refer here for the more explanation
Upvotes: 2