Reputation: 25629
How do speed up this test code in python to Redis on Winxp using python 2.7? Would multiprocessing be better? The load rate in 6000/s vs publish 100,000/s rates. I chose 100,000, but could lower in testing. The process takes 15 seconds.
Would changing setting on server help???
import time
from time import strftime
import redis
import threading, Queue
start_time = time.time()
cxn = redis.StrictRedis('127.0.0.1',6379,1)
class WorkerMain(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while 1:
try: # take a job from the queue
row = self.queue.get_nowait()
except Queue.Empty: raise SystemExit
try:
cxn.set(row, "Row")
#print (row, "Row")
except: print 'Setup Error'
if __name__ == '__main__':
connections = 5
sml = range(1,100000)
queue = Queue.Queue()
for row in sml:
queue.put(str(row))
threads = []
for dummy in range(connections):
t = WorkerMain(queue)
t.start()
threads.append(t)
# wait for all threads to finish
for thread in threads:
thread.join()
print
end_time = time.time()
duration = end_time - start_time
print "Duration: %s" % duration
Used the code below for mulitprocessing and "monitored" the data with CLI...not all data went into the server.
from multiprocessing import Pool
import time
import redis
start_time = time.time()
cxn = redis.Redis('127.0.0.1',6379,1)
def rset(var):
cxn.set(var,"value")
if __name__ =='__main__':
sml = range(1,10000)
#for x in sml:print x
pool = Pool(processes=5)
for row in sml:
pool.apply_async(rset, [(row,)])
#print result.get(),
end_time = time.time()
duration = end_time - start_time
print "Duration: %s" % duration
Here is the pipelined code...... I just commented out the threading stuff.
from time import strftime
import redis
import threading, Queue
start_time = time.time()
cxn = redis.StrictRedis('127.0.0.1',6379,0)
pipe = cxn.pipeline(transaction=False)
class WorkerMain(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while 1:
try: # take a job from the queue
row = self.queue.get_nowait()
except Queue.Empty: raise SystemExit
try:
cxn.set(row, "Row")
#print (row, "ROw")
except: print 'Setup Error'
if __name__ == '__main__':
#connections = 5
sml = range(1,100000)
#queue = Queue.Queue()
for row in sml:
#queue.put(str(row))
pipe.set(str(row),"value").execute()# key, value
# threads = []
# for dummy in range(connections):
# t = WorkerMain(queue)
# t.start()
# threads.append(t)
#
# # wait for all threads to finish
# for thread in threads:
# thread.join()
print
end_time = time.time()
duration = end_time - start_time
print "Duration: %s" % duration
Upvotes: 1
Views: 2402
Reputation: 31528
Use Pipelines. A Pipeline batches commands so you don't pay for network overheads.
See :
Upvotes: 2
Reputation: 10433
Using threading for better performance is not a really good idea if you use cpython (the standard python interpreter) because of the gil.
http://wiki.python.org/moin/GlobalInterpreterLock
multiprocessing should work better
Upvotes: 1