Vincent Teyssier
Vincent Teyssier

Reputation: 2217

How to implement a dynamic amount of concurrent threads?

I am launching concurrent threads doing some stuff:

concurrent = 10
q = Queue(concurrent * 2)
for j in range(concurrent):
    t = threading.Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    # process each line and assign it to an available thread
    for line in call_file:
        q.put(line)
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

At the same time I have a distinct thread counting time:

def printit():
    threading.Timer(1.0, printit).start()
    print current_status

printit()

I would like to increase (or decrease) the amount of concurrent threads for the main process let's say every minute. I can make a time counter in the time thread and make it do things every minute but how to change the amount of concurrent threads in the main process ?

Is it possible (and if yes how) to do that ?

Upvotes: 1

Views: 241

Answers (2)

J. Goedhart
J. Goedhart

Reputation: 209

This is my worker:

def UpdateProcesses(start,processnumber,CachesThatRequireCalculating,CachesThatAreBeingCalculated,CacheDict,CacheLock,IdleLock,FileDictionary,MetaDataDict,CacheIndexDict):
NewPool()
while start[processnumber]:
    IdleLock.wait()
    while len(CachesThatRequireCalculating)>0 and start[processnumber] == True:
        CacheLock.acquire()
        try:
            cacheCode = CachesThatRequireCalculating[0] # The list can be empty if an other process takes the last item during the CacheLock 
            CachesThatRequireCalculating.remove(cacheCode)
            print cacheCode,"starts processing by",processnumber,"process"
        except:
            CacheLock.release()
        else:
            CacheLock.release()
            CachesThatAreBeingCalculated.append(cacheCode[:3])
            Array,b,f = TIPP.LoadArray(FileDictionary[cacheCode[:2]])#opens the dask array
            Array = ((Array[:,:,CacheIndexDict[cacheCode[:2]][cacheCode[2]]:CacheIndexDict[cacheCode[:2]][cacheCode[2]+1]].compute()/2.**(MetaDataDict[cacheCode[:2]]["Bit Depth"])*255.).astype(np.uint16)).transpose([1,0,2]) #slices and calculates the array
            f.close() #close the file
            if CachesThatAreBeingCalculated.count(cacheCode[:3]) != 0: #if not, this cache is not needed annymore (the cacheCode is removed bij wavelengthchange)
                CachesThatAreBeingCalculated.remove(cacheCode[:3])
                try: #If the first time the object if not aivalable try a second time
                    CacheDict[cacheCode[:3]] = Array
                except:
                    CacheDict[cacheCode[:3]] = Array
                print cacheCode,"done processing by",processnumber,"process"
    if start[processnumber]:
        IdleLock.clear()

This is how I start them:

    self.ProcessLst = [] #list with all the processes who calculate the caches
    for processnumber in range(min(NumberOfMaxProcess,self.processes)):
        self.ProcessTerminateLst.append(True)
    for processnumber in range(min(NumberOfMaxProcess,self.processes)):
        self.ProcessLst.append(process.Process(target=Proc.UpdateProcesses,args=(self.ProcessTerminateLst,processnumber,self.CachesThatRequireCalculating,self.CachesThatAreBeingCalculated,self.CacheDict,self.CacheLock,self.IdleLock,self.FileDictionary,self.MetaDataDict,self.CacheIndexDict,)))
        self.ProcessLst[-1].daemon = True
        self.ProcessLst[-1].start()

I close them like this:

    for i in range(len(self.ProcessLst)): #For both while loops in the processes self.ProcessTerminateLst[i] must be True. So or the process is now ready to be terminad or is still in idle mode.
        self.ProcessTerminateLst[i] = False

    self.IdleLock.set() #Makes sure no process is in Idle and all are ready to be terminated

Upvotes: 1

J. Goedhart
J. Goedhart

Reputation: 209

I would use a pool. a pool has a max number of threads it uses at the same time, but you can apply inf number of jobs. They stay in the waiting list until a thread is available. I don't think you can change number of current processes in the pool.

Upvotes: 1

Related Questions