user2368915
user2368915

Reputation: 1

when I try to run the code, it get the error:PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

What is wrong with my code? Thank you

import os
import os.path
import time
global item_count
#-*- coding: cp936 -*-
import MySQLdb
import MySQLdb.cursors
import threading
import multiprocessing
from time import sleep,ctime


def qucun():

    #connect to mysql
    conn=MySQLdb.connect(host="localhost",user="root",passwd="caihong")
    cursor=conn.cursor()
    try:
        cursor.execute("""create database if not exists quad""")
    except:
        print 'Quad is exist'


    conn.select_db('quad')
    conn=MySQLdb.connect(host="localhost",user="root",passwd="caihong",db="quad")
    #get cursor
    cursor=conn.cursor()

    try:
        cursor.execute("""create table if not exists record(fn1 varchar(100),
        fn2 varchar(100),fn3 varchar(100),fn4 varchar(100),
        fn5 varchar(100),fn6 varchar(100),fn7 varchar(100),fn8 varchar(100))""")
    except:
        print 'Table record is exist'

    loops=['2013071818_1.txt','2013071818_2.txt','2013071818_3.txt','2013071818_4.txt','2013071818_5.txt']




    def loop(nloop,filename):


        print 'This loop%s start at:'%nloop,ctime()
        #connect to quad
        conn=MySQLdb.connect(host="localhost",user="root",passwd="caihong",db="quad")
        conn.select_db('quad')
        #get cursor
        cursor=conn.cursor()

        newitem=open('C:\\Python27\\caihong\\%s'%filename,'r')
        data=[line.strip() for line in newitem.readlines()]
        print data
        ##put data into value
        values=['%s'%data[0],'%s'%data[1],'%s'%data[2],'%s'%data[3],'%s'%data[4],
                '%s'%data[5],'%s'%data[6],'%s'%data[7]]

        cursor.execute("""insert into record values(%s,%s,%s,%s,%s,%s,%s,%s)""",values);   

        conn.commit()
        cursor.close()
        sleep(2)
        print 'This loop done at',ctime()



    if __name__=='__main__':

        print 'starting at:',ctime()
        threads=[]
        nloops=range(len(loops))
        pool=multiprocessing.Pool(processes=2)
        for i in nloops:
            t=pool.apply_async(loop,(i,loops[i]))


        pool.close()
        pool.join()

        if t.successful():
            print 'successful'




        print 'all Done at:',ctime()
        os.system("pause")

qucun()

Upvotes: 0

Views: 286

Answers (1)

denz
denz

Reputation: 386

  1. You are attempting to call locally defined function in async.
  2. You are trying to share an open connection between processes.

First is tricky to implement in 2.7 and second is impossible in any multiprocessing

You have to use separate connection for each process in process pool.

import os
import os.path
import time
global item_count
#-*- coding: cp936 -*-
import MySQLdb
import MySQLdb.cursors
import threading
import multiprocessing
from time import sleep,ctime

CONNECTION = None

def close_connection():
    CONNECTION.close()

def get_connection():
    global CONNECTION

    #If this process pool member launched for a first time - create connection
    if CONNECTION is None:

        conn = MySQLdb.connect( host="localhost",
                                user="root",
                                passwd="caihong")
        cursor = conn.cursor()
        try:
            cursor.execute("""create database if not exists quad""")
        except:
            print 'Quad is exist'

        conn.select_db('quad')

        CONNECTION = MySQLdb.connect(host="localhost",
                                     user="root",
                                     passwd="caihong",
                                     db="quad")

        cursor = CONNECTION.cursor()

        try:
            cursor.execute("""create table if not exists record(fn1 varchar(100),
            fn2 varchar(100),fn3 varchar(100),fn4 varchar(100),
            fn5 varchar(100),fn6 varchar(100),fn7 varchar(100),fn8 varchar(100))""")
        except:
            print 'Table record is exist'

        # we dont need to close connection after each insert.
        # insted - register a finalizer once
        # so it will be called right before Pool.close()
        multiprocessing.util.Finalize(CONNECTION, close_connection, exitpriority=1)

    #use existing connection
    return CONNECTION

def loop(nloop, filename):
    conn = get_connection()
    cursor = conn.cursor()

    print 'This loop %s start at: %s'%(nloop, ctime())
    with open('C:\\Python27\\caihong\\%s'%filename, 'r') as newitem:
        data = [line.strip() for line in newitem.readlines()]

        # values=['%s'%data[0],'%s'%data[1],'%s'%data[2],'%s'%data[3],'%s'%data[4],
        #         '%s'%data[5],'%s'%data[6],'%s'%data[7]]
        # ^^^ Thats a bad way to stringify list

        cursor.execute('insert into record values(%s)', ','.join(data));

    conn.commit()

    # we dont need to close connection after each insert. 
    # cursor.close()
    print 'This loop done at', ctime()


LOOPS = ['2013071818_1.txt', '2013071818_2.txt', '2013071818_3.txt', '2013071818_4.txt', '2013071818_5.txt']

if __name__=='__main__':
    pool = multiprocessing.Pool(processes=2)
    results = []
    for i, loopfile in enumerate(LOOPS):
        results.apply(pool.apply_async(loop, (i, loopfile)))

    pool.close()
    pool.join()

    if all((res.successful() for res in results)):
        print 'successful'

    print 'all Done at:', ctime()
        os.system('pause')

Upvotes: 1

Related Questions