peiman F.
peiman F.

Reputation: 1658

socket implement in the thread

i have to read some data from database and send it from a tcp socket so i fetch data from database

    #main
    while True:
        cursor.execute("UPDATE `raw` SET `zombie`='"+zombieId+"' WHERE result='pending' AND protocol='tcp' AND zombie='0' LIMIT 1;")
#       time.sleep(0.2)
        cursor.execute("select * from raw WHERE `result`='pending' AND `protocol`='tcp' and `zombie`='"+zombieId+"' limit 1;")

            if cursor.rowcount>0 :
                    waitedSig = cursor.fetchone()
                    time.sleep(0.2)
                    t = threading.Thread(target=sendData , args=((waitedSig),))
                    t.start()
                    time.sleep(0.6)
            else:
                    time.sleep(1)

on the thread i will send data to target

    def sendData(sig):
        timedata = datetime.datetime.fromtimestamp(int(sig[16]))
        devimei = sig[23]
        devdate = timedata.strftime("%d%m%y")
        devtime = timedata.strftime("%H%M%S")
        lat= format(sig[2])
        lon= format(sig[3])
        satcount = format(sig[5])
        speed = format(sig[4])
        batery = format(sig[7])
        if sig[9]>1000:
                band='00'
        elif sig[9]>850:
                band='02'
        else:
                band='01'
        hdop  = format(sig[10])
        gsmQ =  format(sig[6])
        lac =  format(sig[12])
        cid =  format(sig[13])
str='$MGV002,'+devimei+',12345,S,'+devdate+','+devtime+',A,'+lat+',N,'+lon+',E,0,'+satcount+',00,'+hdop+','+speed+',0,,,432,11,'+lac+','
            try:
                    clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    result = clientsocket.connect(('ip',port))
                    clientsocket.send(str)
                    data = clientsocket.recv(1024)
                    print(str(datetime.datetime.now())+' -> send completed :'+format(sig[0]))
                    clientsocket.close()
            except:
                    print(str(datetime.datetime.now())+' -> connection to tcp server failed!!')

this will work really good but there are two boring problem:

1) if i remove 0.2 and 0.6 sleep delay the script crash due to duplicate socket usage,it seems system try to open an other socket until the previous don't finished its job yet!

2) if some thing goes wrong in the sendData function,the whole script stop working until i manually restart the script

so

1) can i create a thread queue to run one after other and don't affect each other?!

2) how can i handle errors in the thread function to close just that specific thread and script continue its work with next database record?!

Upvotes: 0

Views: 51

Answers (1)

tdelaney
tdelaney

Reputation: 77337

This looks like a good application of a thread pool. In your implementation you create one thread and socket per item in your database table, and that could tax the system extremely. Here I've created 20 workers as an example. There are diminishing returns on the number of workers as you start to stress the system.

import multiprocessing.pool

def sender():
    pool = multiprocessing.pool.ThreadPool(20) # pick your size...
    cursor.execute("select * from database")
    pool.map(sendData, cursor, chunksize=1)

def sendData(sig):
        try:
                clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                result = clientsocket.connect(('ip',port))
                clientsocket.sendall(sig)
                data = clientsocket.recv(1024)
                print(str(datetime.datetime.now())+' -> send completed :'+format(sig[0]))
                clientsocket.shutdown(socket.SOCK_RDWR)
                clientsocket.close()
        except:
                print(str(datetime.datetime.now())+' -> connection to tcp server fa

Upvotes: 1

Related Questions