showkey
showkey

Reputation: 338

how to pass argument into threading?

I want to add 5 for every element in range(1,100) with threading module, to watch which rusult is in which thread. I finished almost of the code,but how to pass argument into threading.Thread?

import threading,queue
x=range(1,100)
y=queue.Queue()
for i in x:
    y.put(i)

def myadd(x):
    print(x+5)


for i in range(5):
    print(threading.Thread.getName())
    threading.Thread(target=myadd,args=x).start() #it is wrong here
    y.join()

Thinks to dano ,it is ok now ,in order to run in interactive way, i rewrite it as:

method 1:run in interactive way.

from concurrent.futures import ThreadPoolExecutor
import threading
x = range(1, 100)

def myadd(x):
    print("Current thread: {}. Result: {}.".format(threading.current_thread(), x+5))

def run():
    t = ThreadPoolExecutor(max_workers=5)
    t.map(myadd, x)
    t.shutdown()
run()

methdo 2:

from concurrent.futures import ThreadPoolExecutor
import threading
x = range(1, 100)
def myadd(x):
    print("Current thread: {}. Result: {}.".format(threading.current_thread(), x+5))
def run():
    t = ThreadPoolExecutor(max_workers=5)
    t.map(myadd, x)
    t.shutdown()
if __name__=="__main__":
    run()

What about if more args to be passed into ThreadPoolExecutor? I want to calculate 1+3, 2+4, 3+45 until 100+102 with multi-processing module. And what about 20+1,20+2,20+3 until 20+100 with multi-processing module?

from multiprocessing.pool import ThreadPool
do = ThreadPool(5)
def myadd(x,y):
    print(x+y)

do.apply(myadd,range(3,102),range(1,100))

How to fix it?

Upvotes: 1

Views: 14316

Answers (3)

DevPlayer
DevPlayer

Reputation: 5569

From:

import threading,queue
x=range(1,100)
y=queue.Queue()
for i in x:
    y.put(i)

def myadd(x):
    print(x+5)


for i in range(5):
    print(threading.Thread.getName())
    threading.Thread(target=myadd,args=x).start() #it is wrong here
    y.join()

To:

import threading
import queue

# So print() in various threads doesn't garble text; 
# I hear it is better to use RLock() instead of Lock().
screen_lock = threading.RLock() 

# I think range() is an iterator or generator. Thread safe?
argument1 = range(1, 100)
argument2 = [100,] * 100 # will add 100 to each item in argument1

# I believe this creates a tuple (immutable). 
# If it were a mutable object then perhaps it wouldn't be thread safe.
data = zip(argument1, argument2)

# object where multiple threads can grab data while avoiding deadlocks.
q = queue.Queue()

# Fill the thread-safe queue with mock data
for item in data:
    q.put(item)

# It could be wiser to use one queue for each inbound data stream.
# For example one queue for file reads, one queue for console input,
# one queue for each network socket. Remembering that rates of 
# reading files and console input and receiving network traffic all
# differ and you don't want one I/O operation to block another.
# inbound_file_data = queue.Queue()
# inbound_console_data = queue.Queue() # etc.

# This function is a thread target
def myadd(thread_name, a_queue):

    # This thread-targetted function blocks only within each thread;
    # at a_queue.get() and at a_queue.put() (if queue is full).
    #
    # Each thread targetting this function has its own copy of
    # this functions local() namespace. So each thread will 
    # pause when the queue is empty, on queue.get(), or when 
    # the queue is full, on queue.put(). With one queue, this 
    # means all threads will block at the same time, when the 
    # single queue is full or when the single queue is empty 
    # unless we check for the number of remaining items in the
    # queue before we do a queue.get() and if none remain in the 
    # queue just exit this function. This presumes the data is 
    # not a continues and slow stream like a network connection 
    # or a rotating log file but limited like a closed file.

    # Let each thread continue to read from the global 
    # queue until it is empty. 
    # 
    # This is a bad use-case for using threading. 
    # 
    # If each thread had a separate queue it would be 
    # a better use-case. You don't want one slow stream of 
    # data blocking the processing of a fast stream of data.
    #
    # For a single stream of data it is likely better just not 
    # to use threads. However here is a single "global" queue 
    # example...

    # presumes a_queue starts off not empty
    while a_queue.qsize():
        arg1, arg2 = a_queue.get() # blocking call

        # prevent console/tty text garble
        if screen_lock.acquire():
            print('{}: {}'.format(thread_name, arg1 + arg2))
            print('{}: {}'.format(thread_name, arg1 + 5))
            print()
            screen_lock.release()
        else:
            # print anyway if lock fails to acquire
            print('{}: {}'.format(thread_name, arg1 + arg2))
            print('{}: {}'.format(thread_name, arg1 + 5))
            print()

        # allows .join() to keep track of when queue finished
        a_queue.task_done()


# create threads and pass in thread name and queue to thread-target function
threads = []
for i in range(5):
    thread_name = 'Thread-{}'.format(i)
    thread = threading.Thread(
        name=thread_name, 
        target=myadd, 
        args=(thread_name, q))

    # Recommended:
    # queues = [queue.Queue() for index in range(len(threads))] # put at top of file 
    # thread = threading.Thread(
    #   target=myadd, 
    #   name=thread_name, 
    #   args=(thread_name, queues[i],))
    threads.append(thread)

# some applications should start threads after all threads are created.
for thread in threads:
   thread.start()

# Each thread will pull items off the queue. Because the while loop in 
# myadd() ends with the queue.qsize() == 0 each thread will terminate 
# when there is nothing left in the queue.

Upvotes: 0

dano
dano

Reputation: 94871

It looks like you're trying to create a thread pool manually, so that five threads are used to add up all 100 results. If this is the case, I would recommend using multiprocessing.pool.ThreadPool for this:

from multiprocessing.pool import ThreadPool
import threading
import queue

x = range(1, 100)

def myadd(x):
    print("Current thread: {}. Result: {}.".format(
               threading.current_thread(), x+5))

t = ThreadPool(5)
t.map(myadd, x)
t.close()
t.join()

If you're using Python 3.x, you could use concurrent.futures.ThreadPoolExecutor instead:

from concurrent.futures import ThreadPoolExecutor
import threading

x = range(1, 100)

def myadd(x):
    print("Current thread: {}. Result: {}.".format(threading.current_thread(), x+5))

t = ThreadPoolExecutor(max_workers=5)
t.map(myadd, x)
t.shutdown()

I think there are two issues with your original code. First, you need to pass a tuple to the args keyword argument, not a single element:

threading.Thread(target=myadd,args=(x,))

However, you're also trying to pass the entire list (or range object, if using Python 3.x) returned by range(1,100) to myadd, which isn't really what you want to do. It's also not clear what you're using the queue for. Maybe you meant to pass that to myadd?

One final note: Python uses a Global Interpreter Lock (GIL), which prevents more than one thread from using the CPU at a time. This means that doing CPU-bound operations (like addition) in threads provides no performance boost in Python, since only one of the threads will ever run at a time. Therefore, In Python it's preferred to use the multiple processes to parallelize CPU-bound operations. You could make the above code use multiple processes by replacing the ThreadPool in the first example with from mulitprocessing import Pool. In the second example, you would use ProcessPoolExecutor instead of ThreadPoolExecutor. You would also probably want to replace threading.current_thread() with os.getpid().

Edit:

Here's how to handle the case where there are two different args to pass:

from multiprocessing.pool import ThreadPool

def myadd(x,y):
    print(x+y)

def do_myadd(x_and_y):
    return myadd(*x_and_y)

do = ThreadPool(5)    
do.map(do_myadd, zip(range(3, 102), range(1, 100)))

We use zip to create a list where we pair together each variable in the range:

[(3, 1), (4, 2), (5, 3), ...]

We use map to call do_myadd with each tuple in that list, and do_myadd uses tuple expansion (*x_and_y), to expand the tuple into two separate arguments, which get passed to myadd.

Upvotes: 0

Avinash Babu
Avinash Babu

Reputation: 6252

Here you need to pass a tuple rather than using a single element.

For making a tuple the code would be .

dRecieved = connFile.readline();
processThread = threading.Thread(target=processLine, args=(dRecieved,)); 
processThread.start();

Please refer here for the more explanation

Upvotes: 2

Related Questions