user3467349
user3467349

Reputation: 3191

Python Running Multiple Locks across Multiple Threads

So the situation is that I have multiple methods, which might be threaded simaltenously, but all need their own lock against being re-threaded until they have run. They are established by initialising a class with some dataprocessing options:

class InfrequentDataDaemon(object): pass
class FrequentDataDaemon(object): pass

def addMethod(name): 
    def wrapper(f):
        setattr(processor, f.__name__, staticmethod(f))
        return f
    return wrapper
    
class DataProcessors(object): 
    lock = threading.Lock() 
    def __init__(self, options): 
        self.common_settings = options['common_settings']
        
        self.data_processing_configurations = options['data_processing_configurations'] #Configs for each processing method
        self.data_processing_types = options['data_processing_types'] 
        self.Data_Processsing_Functions ={}
        
        #I __init__ each processing method as a seperate function so that it can be locked
        for type in options['data_processing_types']: 
            def bindFunction1(name):
                def func1(self, data=None, lock=None):
                    config = self.data_processing_configurations[data['type']] #I get the right config for the datatype
                    with lock:
                        FetchDataBaseStuff(data['type'])
                         #I don't want this to be run more than once at a time per DataProcessing Type
                         # But it's fine if multiple DoSomethings run at once, as long as each DataType is different!
                        DoSomething(data, config) 
                        WriteToDataBase(data['type'])
                func1.__name__ = "Processing_for_{}".format(type)
                self.Data_Processing_Functions[func1.__name__] = func1 #Add this function to the Dictinary object
           bindFunction1(type)

        #Then I add some methods to a daemon that are going to check if our Dataprocessors need to be called
        def fast_process_types(data): 
            if not example_condition is True: return
            if not data['type'] in self.data_processing_types: return #Check that we are doing something with this type of data
            threading.Thread(target=self.Data_Processing_Functions["Processing_for_{}".format(data['type'])], args=(self,data, lock)).start()
        
        def slow_process_types(data): 
            if not some_other_condition is True: return
            if not data['type'] in self.data_processing_types: return #Check that we are doing something with this type of data
            threading.Thread(target=self.Data_Processing_Functions["Processing_for_{}".format(data['type'])], args=(self,data, lock)).start()
        
        addMethod(InfrequentDataDaemon)(slow_process_types)
        addMethod(FrequentDataDaemon)(fast_process_types)

The idea is to lock each method in DataProcessors.Data_Processing_Functions - so that each method is only accessed by one thread at a time (and the rest of the threads for the same method are queued). How does Locking need to be set up to achieve this effect?

Upvotes: 0

Views: 6208

Answers (1)

dano
dano

Reputation: 94881

I'm not sure I completely follow what you're trying to do here, but could you just create a separate threading.Lock object for each type?

class DataProcessors(object): 
    def __init__(self, options): 
        self.common_settings = options['common_settings']

        self.data_processing_configurations = options['data_processing_configurations'] #Configs for each processing method
        self.data_processing_types = options['data_processing_types'] 
        self.Data_Processsing_Functions ={}
        self.locks = {}

        #I __init__ each processing method as a seperate function so that it can be locked
        for type in options['data_processing_types']: 
            self.locks[type] = threading.Lock()
            def bindFunction1(name):
                def func1(self, data=None):
                    config = self.data_processing_configurations[data['type']] #I get the right config for the datatype
                    with self.locks[data['type']]:
                        FetchDataBaseStuff(data['type'])
                        DoSomething(data, config) 
                        WriteToDataBase(data['type'])
                func1.__name__ = "Processing_for_{}".format(type)
                self.Data_Processing_Functions[func1.__name__] = func1 #Add this function to the Dictinary object
           bindFunction1(type)

        #Then I add some methods to a daemon that are going to check if our Dataprocessors need to be called
        def fast_process_types(data): 
            if not example_condition is True: return
            if not data['type'] in self.data_processing_types: return #Check that we are doing something with this type of data
            threading.Thread(target=self.Data_Processing_Functions["Processing_for_{}".format(data['type'])], args=(self,data)).start()

        def slow_process_types(data): 
            if not some_other_condition is True: return
            if not data['type'] in self.data_processing_types: return #Check that we are doing something with this type of data
            threading.Thread(target=self.Data_Processing_Functions["Processing_for_{}".format(data['type'])], args=(self,data)).start()

        addMethod(InfrequentDataDaemon)(slow_process_types)
        addMethod(FrequentDataDaemon)(fast_process_types)

Upvotes: 1

Related Questions