Hani Goc
Hani Goc

Reputation: 2441

cannot increment value when using threadpool in python

I am using the class that was provided by Python Thread Pool (Python recipe) in order to simulate thread pooling. I am trying to increment the value counter in function test. The problem is that it is remaining 0. I used lock that was explained in Is this simple python code thread safe but still it's not working.


Source code

#! /usr/bin/python
# -*- coding: utf-8 -*-
from Queue import Queue
from threading import Thread
import threading

lock = threading.Lock()

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()
    
    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try: func(*args, **kargs)
            except Exception, e: print e
            self.tasks.task_done()

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads): Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

def exp1_thread(counter):
  with lock:
    print counter
    counter = counter + 1

def test():
  # 1) Init a Thread pool with the desired number of threads
  pool = ThreadPool(6)
  counter = 0
  for i in range(0, 10):
    pool.add_task(exp1_thread,counter)
  
   # 3) Wait for completion
  pool.wait_completion()
    
if __name__ == "__main__": 
  test()

output

counter 0
counter 0
counter 0
counter 0
counter 0
counter 0
counter 0
counter 0
counter 0
counter 0

Upvotes: 0

Views: 1824

Answers (3)

Torxed
Torxed

Reputation: 23500

int work differently from dictionaries, I'm sure someone well versed in Python logic can explain the difference. This is the logic I usually use and it works. As I just realized it's because you're passing the object (dict, list etc) and not the value itself.

Either declare your variables as globals (but be careful) or use say a dictionary with individual key slots for the different threads and sum them up at the end.

from threading import *
from time import sleep

myMap = {'counter' : 0}

class worker(Thread):
    def __init__(self, counterMap):
        Thread.__init__(self)
        self.counterMap = counterMap
        self.start()

    def run(self):
        self.counterMap['counter'] += 1

worker(myMap)
sleep(0.2)
worker(myMap)

sleep(0.2)
print(myMap)

Upvotes: 0

ssundarraj
ssundarraj

Reputation: 820

This has nothing to do with threading. The actual reason is that int is immutable in Python. A function that just increments an int would not have the desired effect.

def inc(x):
    x +=1
y = 0
inc(y)
print y  # 0

If you want to increment the number you can store it in a mutable datatype (such as a list or dict) and manipulate the list.

Upvotes: 1

eirikjak
eirikjak

Reputation: 76

The reason why you are getting all zeros is because the integer value counter is passed by value to the threads. Each thread receives a copy of the counter and then goes to town.

You can fix this by finding a way to pass the value by reference.

Option 1. Define counter as a list you pass around:

def exp1_thread(counter):
    with lock:
        print counter[0]
        counter[0] = counter[0] + 1

def test():
    # 1) Init a Thread pool with the desired number of threads
    pool = ThreadPool(6)
    counter = [0]
    for i in range(0, 10):
        pool.add_task(exp1_thread, counter)

    # 3) Wait for completion
    pool.wait_completion()

Option 2. Create an object you pass around.

class Counter:
    def __init__(self, initial_count):
        self.count = initial_count

def exp1_thread(counter):
    with lock:
        print counter.count
        counter.count = counter.count + 1

def test():
    # 1) Init a Thread pool with the desired number of threads
    pool = ThreadPool(6)
    counter = Counter(0)
    for i in range(0, 10):
        pool.add_task(exp1_thread, counter)

    # 3) Wait for completion
    pool.wait_completion()

Upvotes: 2

Related Questions