Selah
Selah

Reputation: 8064

Process very large 900M row MySQL table line by line with Python

I often need to process several hundred million rows of a MySQL table on a line by line basis using Python. I want a script that is robust and does not need to be monitored.

Below I pasted a script that classifying the language of the message field in my row. It utilizes the sqlalchemy and MySQLdb.cursors.SSCursor modules. Unfortunately this script consistently throws a 'Lost connection to MySQL server during query' error after 4840 rows when I run remotely and 42000 rows when I run locally.

Also, I have checked and max_allowed_packet = 32M on my MySQL server's /etc/mysql/my.cnf file as per the answers to this stackoverflow question Lost connection to MySQL server during query

Any advice for either fixing this error, or using another approach to use Python for processing very large MySQL files in a robust way would be much appreciated!

import sqlalchemy
import MySQLdb.cursors
import langid

schema = "twitterstuff"
table = "messages_en" #900M row table
engine_url = "mysql://myserver/{}?charset=utf8mb4&read_default_file=~/.my.cnf".format(schema)
db_eng = sqlalchemy.create_engine(engine_url, connect_args={'cursorclass': MySQLdb.cursors.SSCursor} )
langid.set_languages(['fr', 'de'])

print "Executing input query..."
data_iter = db_eng.execute("SELECT message_id, message FROM {} WHERE langid_lang IS NULL LIMIT 10000".format(table))

def process(inp_iter):
    for item in inp_iter:
        item = dict(item)
        (item['langid_lang'], item['langid_conf']) = langid.classify(item['message'])
        yield item

def update_table(update_iter):
    count = 0;
    for item in update_iter:
        count += 1;
        if count%10 == 0:
            print "{} rows processed".format(count)
        lang = item['langid_lang']
        conf = item['langid_conf']
        message_id = item['message_id']
        db_eng.execute("UPDATE {} SET langid_lang = '{}', langid_conf = {} WHERE message_id = {}".format(table, lang, conf, message_id))

data_iter_upd = process(data_iter)

print "Begin processing..."
update_table(data_iter_upd)

Upvotes: 2

Views: 601

Answers (1)

unutbu
unutbu

Reputation: 879471

According to MySQLdb developer Andy Dustman,

[When using SSCursor,] no new queries can be issued on the connection until the entire result set has been fetched.

That post says that if you issue another query you will get a "commands out of sequence" error, which is not the error you are seeing. So I am not sure that the following will necessarily fix your problem. Nevertheless, it might be worth trying to remove SSCursor from your code and use the simpler default Cursor just to test if that is the source of the problem.

You could, for example, use LIMIT chunksize OFFSET n in your SELECT statement to loop through the data set in chunks:

import sqlalchemy
import MySQLdb.cursors
import langid
import itertools as IT
chunksize = 1000

def process(inp_iter):
    for item in inp_iter:
        item = dict(item)
        (item['langid_lang'], item['langid_conf']) = langid.classify(item['message'])
        yield item

def update_table(update_iter, engine):
    for count, item in enumerate(update_iter):
        if count%10 == 0:
            print "{} rows processed".format(count)
        lang = item['langid_lang']
        conf = item['langid_conf']
        message_id = item['message_id']
        engine.execute(
            "UPDATE {} SET langid_lang = '{}', langid_conf = {} WHERE message_id = {}"
            .format(table, lang, conf, message_id))

schema = "twitterstuff"
table = "messages_en" #900M row table
engine_url = ("mysql://myserver/{}?charset=utf8mb4&read_default_file=~/.my.cnf"
              .format(schema))

db_eng = sqlalchemy.create_engine(engine_url)
langid.set_languages(['fr', 'de'])

for offset in IT.count(start=0, step=chunksize):
    print "Executing input query..."
    result = db_eng.execute(
        "SELECT message_id, message FROM {} WHERE langid_lang IS NULL LIMIT {} OFFSET {}"
        .format(table, chunksize, offset))
    result = list(result)
    if not result: break
    data_iter_upd = process(result)

    print "Begin processing..."
    update_table(data_iter_upd, db_eng)

Upvotes: 1

Related Questions