Reputation: 1661
I have a number of files (over 4000) that I want to simultaneously load into PostgreSQL. I have separated them into 4 different file lists and I want a thread to iterate through each list loading the data.
The problem I have is that is I use os.system to call the loading program but this prevents the other threads from running simultaneously. If I use subprocess.Popen then they run simultaneously but the threads believe they have finished execeuting so move onto the next part of my script.
Am I doing this the right way? Or is there a better way to call subprocesses from within a thread.
def thread1Load(self, thread1fileList):
connectionstring = settings.connectionstring
postgreshost = settings.postgreshost
postgresdatabase = settings.postgresdatabase
postgresport = settings.postgresport
postgresusername = settings.postgresusername
postgrespassword = settings.postgrespassword
tablename = None
encoding = None
connection = psycopg2.connect(connectionstring)
for filename in thread1fileList:
load_cmd = #load command
run = subprocess.Popen(load_cmd, shell=True)
print "finished loading thread 1"
def thread2Load(self, thread2fileList):
connectionstring = settings.connectionstring
postgreshost = settings.postgreshost
postgresdatabase = settings.postgresdatabase
postgresport = settings.postgresport
postgresusername = settings.postgresusername
postgrespassword = settings.postgrespassword
tablename = None
connection = psycopg2.connect(connectionstring)
for filename in thread2fileList:
load_cmd = #load command
run = subprocess.Popen(load_cmd, shell=True)
print "finished loading thread 2"
def thread3Load(self, thread3fileList):
connectionstring = settings.connectionstring
postgreshost = settings.postgreshost
postgresdatabase = settings.postgresdatabase
postgresport = settings.postgresport
postgresusername = settings.postgresusername
postgrespassword = settings.postgrespassword
tablename = None
connection = psycopg2.connect(connectionstring)
for shapefilename in thread3fileList:
load_cmd = #load command
run = subprocess.Popen(load_cmd, shell=True)
print "finished loading thread 3"
def thread4Load(self, thread4fileList):
connectionstring = settings.connectionstring
postgreshost = settings.postgreshost
postgresdatabase = settings.postgresdatabase
postgresport = settings.postgresport
postgresusername = settings.postgresusername
postgrespassword = settings.postgrespassword
tablename = None
connection = psycopg2.connect(connectionstring)
for filename in thread4fileList:
load_cmd = #load command
run = subprocess.Popen(load_cmd, shell=True)
print "finished loading thread 4"
def finishUp(self):
print 'finishing up'
def main():
load = Loader()
thread1 = threading.Thread(target=(load.thread1Load), args=(thread1fileList, ))
thread2 = threading.Thread(target=(load.thread2Load), args=(thread2fileList, ))
thread3 = threading.Thread(target=(load.thread3Load), args=(thread3fileList, ))
thread4 = threading.Thread(target=(load.thread4Load), args=(thread4fileList, ))
threads = [thread1, thread2, thread3, thread4]
for thread in threads:
thread.start()
thread.join()
load.finishUp(connectionstring)
if __name__ == '__main__':
main()
Upvotes: 3
Views: 6947
Reputation: 879671
threadLoad
method suffices. That way, if you need to modify something in the method you do not need to make the same modification in 4 different places.run.communicate()
to block until the subprocess is done.This starts one thread, then blocks until that thread finishes, then starts another thread, etc:
for thread in threads:
thread.start()
thread.join()
Instead, start all the threads first, then join all the threads:
for thread in threads:
thread.start()
for thread in threads:
thread.join()
import subprocess
import threading
class Loader(object):
def threadLoad(self, threadfileList):
connectionstring = settings.connectionstring
...
connection = psycopg2.connect(connectionstring)
for filename in threadfileList:
load_cmd = # load command
run = subprocess.Popen(load_cmd, shell=True)
# block until subprocess is done
run.communicate()
name = threading.current_thread().name
print "finished loading {n}".format(n=name)
def finishUp(self):
print 'finishing up'
def main():
load = Loader()
threads = [threading.Thread(target=load.threadLoad, args=(fileList, ))
for fileList in (thread1fileList, thread2fileList,
thread3fileList, thread4fileList)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
load.finishUp(connectionstring)
if __name__ == '__main__':
main()
Upvotes: 7