lukik
lukik

Reputation: 4060

This Non-Threaded script unexpectedly runs faster than the Threaded version

I have a python script which validates data fetched from some rows in a database and then logs the errors in a different table in the same database. The script validates each row and marks it as validated & has error = True/False depending on the validation outcome. This process is repeated for each row. With that, I thought I'd add some steroids by creating threads such that the validation for each row is done by independent threads thus reducing the time it takes to validate a batch of rows.

To my surprise, I find that the threaded script is taking slightly longer than the non-threaded one. On average to validate 1502 rows of data it takes the Non-Threaded script 1.5 seconds while the threaded script takes 2.27 seconds. That might not be much but ideally I'll be running through 2 million records at a go so that time overhead will be significant. That plus I would assume that threaded apps would finish faster! :-)

The two scripts clock the same time of about 0.01 seconds upto the point of creating threads. By this point the SQLAlchemy session is created and all the data to be validated and relations i.e foreign keys etc are fetched. From there though, the non-threaded script finishes faster. Below is my code.

1.0 None-Threaded Script

#Alot of code goes above this to fetch the data  that is passed on to the validator function
#However, the two scripts are the same upto this point in regards to time taken so didn't see need to post them.
for lf_detail_id in load_file_detail_id:
    params = lf_detail_id, load_file_id, entry_number[lf_detail_counter], \
           data[lf_detail_counter], template_version[lf_counter], \
           load_file_detail, error, dt_file, dt_columns 
    data_list.append(params)
    lf_detail_counter += 1
    no_of_records += 1

validator = Validate()
validator.validator(no_of_records, data_list)
record_counter += lf_detail_counter
data_list = None
no_of_records = 0
print("Validated '%s': seconds %s" %(filename[lf_counter], time.time()-file_start_time))     #print time it took to run'

#Mark the load file as validated
is_done = load_file.set_validation(load_file_id, True)
if is_done == False:
    raise Exception ("Can't update load_file's is_validated parameter: ", lf_detail_id)

#Reset counters
lf_detail_counter = 0
lf_counter += 1

#Commit The Entire Transaction.
session.commit()
print("NoThread:Finished validating %s file(s) with %s record(s) in %s seconds\n" %(lf_counter, record_counter, time.time()- process_start_time))

1.1. Validation Function for Non-Threaded Script

class Validate():
    has_error = None
    def validator(self, loop_length, job):                
        '''Validate data'''
        for row_counter in range(loop_length):
            load_file_detail_id, load_file_id, entry_number, data, \
            template_version, load_file_detail, error, dt_file, dt_columns = job[row_counter]
            error_detail = ErrorLogDetail()
            if data.strip() == "":
                error_detail.errorlog = error
                error_detail.load_file_detail_id = load_file_detail_id
                error_detail.pos_row = entry_number
                error_detail.pos_col = None
                error_detail.value_provided = None
                error_detail.column_name = None
                error_detail.value_provided = None
                error_detail.description = "error message 1"
                session.add(error_detail)
                error_detail = ErrorLogDetail()
                self.has_error = True
                self.set_validation(load_file_detail, load_file_detail_id, True, False)
                continue
            elif len(data) != int(dt_file.data_length):
                error_detail.errorlog = error
                error_detail.load_file_detail_id = load_file_detail_id = load_file_detail_id
                error_detail.pos_row = entry_number   
                error_detail.pos_col = None
                error_detail.column_name = None
                error_detail.value_provided = None
                error_detail.description = "error message 2"
                session.add(error_detail)
                error_detail = ErrorLogDetail()
                self.has_error = True
                self.set_validation(load_file_detail, load_file_detail_id, True, False)  
                continue
            else:
                #Continue with extra validation

            #If record passes all validation then mark mark it as haserror = False
            if self.has_error == False:
                self.set_validation(load_file_detail, load_file_detail_id, False, True)
            else:
                self.has_error = False
            jobs.task_done()    #For the script with threading the job is marked as done. Else this does not appear in the non-threaded script  

2.0 Threaded Script

#Alot of code goes above this to fetch the data  that is passed on to the validator function
#However, the two scripts are the same upto this point in regards to time taken so didn't see need to post them.
for lf_detail_id in load_file_detail_id:
    params = lf_detail_id, load_file_id, entry_number[lf_detail_counter], \
           data[lf_detail_counter], template_version[lf_counter], \
           load_file_detail, error, dt_file, dt_columns 
    data_list.append(params)
    lf_detail_counter += 1
    queue_size += 1
    if queue_size == THREAD_LIMIT:
        myqueuing(queue_size, data_list)
        queue_size = 0

#spawn a pool of threads, and pass them queue instance 
if queue_size > 0:
    myqueuing(queue_size, data_list)

#Keep record of rows processed
record_counter += lf_detail_counter 
print("Validated '%s': seconds- %s " %(filename[lf_counter], time.time()-file_start_time))     #print time it took to run'

#Mark the load file as validated
is_done = load_file.set_validation(load_file_id, True)
if is_done == False:
    raise Exception ("Can't update load_file's is_validated parameter: ", lf_detail_id)

#Commit The Entire Transaction.
session.commit()
#Reset counters
lf_detail_counter = 0
lf_counter += 1
data_list = None
queue_size = 0              
print("HasThread:Finished loading %s file(s) with %s record(s) in %s seconds\n" %(lf_counter, record_counter, time.time()-process_start_time))     #print time it took to run'

2.1. Threaded Validation Function

THREAD_LIMIT = 50                # This is how many threads we want
jobs = queue.Queue()           # This sets up the queue object to use 5 slots
singlelock = threading.Lock()   # This is a lock so threads don't print trough each other (and other reasons)

def myqueuing(queuesize, data):
    '''Put the fetched data in a queue and instantiate threads to
    process the queue'''
    # Spawn the threads
    is_valid_date("20131212", True) #Calling this here to avoid a bug in time.striptime() when threading
    for x in range(queuesize):
        # This is the thread class that we instantiate.
        workerbee().start()

    # Put stuff in queue
    for i in range(queuesize):
        # Block if queue is full, and wait 2 seconds. After 5s raise Queue Full error.
        try:
            jobs.put(data[i], block=True, timeout=2)
        except:
            singlelock.acquire()
            print ("The queue is full !")
            singlelock.lock.release()       

    # Wait for the threads to finish
    singlelock.acquire()        # Acquire the lock so we can print
    print ("Waiting for threads to finish.")
    singlelock.release()       # Release the lock
    jobs.join()                 # This command waits for all threads to finish.             


class workerbee(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.lock = threading.Lock()
        self.has_error = False

    def run(self):
        #try:
        job = jobs.get(True,1)
        load_file_detail_id, load_file_id, entry_number, data, \
        template_version, load_file_detail, error, dt_file, dt_columns = job                
        '''Validates the data.'''
        error_detail = ErrorLogDetail()
        #Again please note that this part is identical for both the non-threaded and the threaded script. 
        #After each pass on a record, the record is marked as validated and if has_error = True
        if data.strip() == "":
            error_detail.errorlog = error
            error_detail.load_file_detail_id = load_file_detail_id
            error_detail.pos_row = entry_number
            error_detail.pos_col = None
            error_detail.value_provided = None
            error_detail.column_name = None
            error_detail.value_provided = None
            error_detail.description = "erro message1"
            session.add(error_detail)
            error_detail = ErrorLogDetail()
            self.has_error = True
            self.set_validation(load_file_detail, load_file_detail_id, True, True)     
        elif len(data) != int(dt_file.data_length):
            error_detail.errorlog = error
            error_detail.load_file_detail_id = load_file_detail_id = load_file_detail_id
            error_detail.pos_row = entry_number   
            error_detail.pos_col = None
            error_detail.column_name = None
            error_detail.value_provided = None
            error_detail.description = "erro message2")
            session.add(error_detail)
            error_detail = ErrorLogDetail()
            self.has_error = True
            self.set_validation(load_file_detail, load_file_detail_id, True, True)    
        else:
            #Continue with further validation - about 5 other validation checks

        #If record passes all validation then mark mark it as haserror = False
        if self.has_error == False:
            self.set_validation(load_file_detail, load_file_detail_id, False, True)
        else:
            self.has_error = False
        jobs.task_done()    #For the script with threading the job is marked as done. Else this does not appear in the non-threaded script

3.0. Common function for setting validation in both threaded and non-threaded

def set_validation(self, load_file_detail, load_file_detail_id, has_error, can_be_loaded):
    '''Mark the record as having been validated and whether has error = True or False'''
    #print("haserror and canbeloaded ", has_error, can_be_loaded)
    is_done = load_file_detail.set_validation_and_error(load_file_detail_id, True, has_error, can_be_loaded)
    if is_done == False:
        raise Exception ("Can't update load_file_detail's is_validated parameter: ", load_file_detail_id)                   

3.1. Actual SQLAlchemy session for saving the validation status

def set_validation_and_error(self, load_file_detail_id, is_validated, has_error, can_be_loaded):
    result = session.execute('UPDATE load_file_detail SET is_validated=%s, has_error=%s, can_be_loaded=%s WHERE id=%s' \
                    %(is_validated, has_error, can_be_loaded, load_file_detail_id))

So, the fetching of data to be validated is the same and both scripts take same amount of time up to that point. The validation process is the same for both scripts and saving to DB is the same i.e. Section 3.0 and 3.1 are shared by both scripts. The only difference is the validation with multiple threads. So am thinking maybe there is something about the multiple threads and SQLAlchemy that is making the app slower in threaded mode? Have I implemented the threaded function in the proper way? One of those or threading is not suitable in this scenario. Suggestions welcome.

Upvotes: 0

Views: 156

Answers (1)

eri
eri

Reputation: 3524

You must create Queue for logging and add "logger" thread. So you remove locks code must be faster.

Also create DB connections in each thread to be able to get data in parallel.

Treads parallelize only C-library calls because of GIL.

For parallelize python code You must use multiprocessing.

I write test for You, describing how to process iterable:

def produce_data(amount=100000, invalid=1, valid=10): 
# produce_data = sql('request').getall()
    import random
    id = 0
    data = [True]*valid + [False]*invalid
    while id < amount:
        id+=1
        yield (id,random.choice(data))


def validate(row):
    if row[1]:
        time.sleep(0.001) #set valid sql request emulation.
        return True
    else:
        time.sleep(0.001) #set invalid sql request emulation.
        return False



def single():
    for row in produce_data():
        validate(row)

def targeted():
    import threading
    for row in produce_data():
        threading.Thread(target=validate,args=(row,))

Uley = 50

class Bee(object):
        error=False
        running = True
        def __init__(self,queue,*args,**kwargs):
            self.queue=queue #dont use any global variable!
            # every bee must have unique db connection and session.
            #self.session = db.connection().session()
            # initialize it there.
            return super(Bee,self).__init__(*args,**kwargs)

        def run(self):
            while self.running:
                data=self.queue.get()
                if data: 
                    self.error = validate(data) # refactor it to self.validate(data) to be able to get cursor from self.session.
                    self.queue.task_done()
                else:
                    self.queue.task_done()
                    break

            #self.session.commit()                  


def treaded():
    import threading,Queue

    class TreadedBee(Bee,threading.Thread): pass

    q = Queue.Queue()

    for i in range(Uley): #bees started before data was provided.
        bee=TreadedBee(q)
        bee.daemon = True
        bee.start()

    for row in produce_data(): #you dont need to get all data to begin processing, at this place must be cursor of response.
        q.put(row)

    q.join()
    for i in range(Uley):
        q.put(None)


def forked():
    from multiprocessing import Process,JoinableQueue
    class ForkedBee(Bee,Process): pass

    q = JoinableQueue()
    for i in range(Uley):
        bee=ForkedBee(q)
        bee.start()

    for row in produce_data():
        q.put(row)

    q.join()
    #at this you need to kill zomBee -)
    for i in range(Uley):
        q.put(None)
    q.close()

def pool():
    from multiprocessing import Pool
    pool = Pool(processes=Uley)
    pool.map(validate,produce_data())

if __name__ == "__main__":
    import time
    s=time.time()
    single() 
    print(time.time()-s) #109
    s=time.time()
    single() 
    print(time.time()-s) #6
    s=time.time()
    treaded()
    print(time.time()-s) #12
    s=time.time()
    forked()
    print(time.time()-s) #6
    s=time.time()
    pool() 
    print(time.time()-s) #4

test result:

$ python2 tgreads.py 
109.779700994
5.84457302094
12.3814198971
5.97618508339
3.69856286049

targeted will flood CPU, memory and you cant provide individual connections to DB, using shared connection is not safe. If want to go in this way - you need to provide output queue and realize collector, that will communicate with DB. pool is short-code and fastest, but not friendly to initiate per-worker connections.

Upvotes: 2

Related Questions