pkdkk
pkdkk

Reputation: 3963

Using MySQL table as a queue with threads in Python

I have a DB with a queue table, new entries are inserted continuously in the queue.

I want a Python script to execute the queue as fast as possible, and I think I need some threaded code to do so, running like a daemon.

But I can't figure out how to use the DB as the queue.

I am looking at this example:

import MySQLdb
from Queue import Queue
from threading import Thread

def do_stuff(q):
    while True:
        print q.get()
        q.task_done()

q = Queue(maxsize=0)
num_threads = 10

for i in range(num_threads):
    worker = Thread(target=do_stuff, args=(q,))
    worker.setDaemon(True)
    worker.start()

// TODO:  Use the DB
db = MySQLdb.connect(...)
cursor = db.cursor()
q = cursor.execute("SELECT * FROM queue")

for x in range(100):
    q.put(x)
q.join()

Upvotes: 3

Views: 3021

Answers (2)

Abhishek Pathak
Abhishek Pathak

Reputation: 1569

2 quick points :

  1. Assuming you are using cPython, The GIL will effectively render threading useless, allowing only 1 thread through the interpreter at one time. Couple of workarounds are :

    • The Gevent library [source]

      gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.

    • The multiprocessing module, you can spawn multiple processes - this is true concurrency in python.

    • The concurrent.futures module - new in python 3, port available for python 2. [source]

      This is a new high-level library that operates only at a “job” level, which means that you no longer have to fuss with
      synchronization, or managing threads or processes. you just specify a thread or process pool with a certain number of “workers,” submit
      jobs, and collate the results. It’s new in Python 3.2, but a port for Python 2.6+ is available at http://code.google.com/p/pythonfutures.

You can use the SSDictCursor() of MySQLdb and do a fetchone().This is a streaming cursor and you can run this in an infinite while() loop to resemble a queue:

cur = MySQLdb.cursors.SSDictCursor()

cur.execute(query)

while True:

row = cursor.fetchone()

if not row : break # (or sleep()!)

else: # other
  1. Having said all that, I would suggest you look at implementing tools like celery or mongodb to emulate queues and workers. Relational databases are just not cut out for that kind of a job and suffer unnecessary fragmentation. Here's a great source if you want to know more about fragmentation in mysql.

Upvotes: 3

Kuishi
Kuishi

Reputation: 157

I am not sure if its the best solution but I think of a structure of a main-thread which reads the db and fill the Queue. Make sure to avoid doublets. Maybe by using primary key of increasing numbers would be easy to check.

The Worker-Structure is nice, but like mentioned in comments: the GIL will avoid any boost. But you could use multiprocessing if your "do_stuff" is independent from the script himself (f.e. the tasks are pictures and the "do_stuff" is "rotate ervery picture 90°"). Afaik it doesn't suffer from GIL

https://docs.python.org/2/library/subprocess.html get you some informations about that.

PS: English isn't my native language.

Upvotes: 1

Related Questions