goetzmoritz
goetzmoritz

Reputation: 453

multithreading sockets in python

I'm trying to build up a non-blocking UDP-Server that listens to different ports and receives data-packages until timeout. Unfortunatly, I cannot change the client-side and UDP is mandatory. Receiving files works fine. The issue is, that creating the workers is a blocking operation. I'd like to have it non-blocking so all workers are called in parallel. Also each worker should run in a loop like while True but that was blocking, too.

Here is my code:

#!/usr/bin/env python
from socket import *
import sys
import select
import threading
threads = []

def worker(port):
        host="192.168.88.51"
        s = socket(AF_INET,SOCK_DGRAM)
        s.bind((host,port))
        addr = (host,port)
        buf=128
        data,addr = s.recvfrom(buf)
        filename =  str(port)+".data"
        print str(port)+" received File:"
        f = open(filename,'wb')

        data, addr = s.recvfrom(buf)
        try:
            while(data):
                f.write(data)
                s.settimeout(1)
                data,addr = s.recvfrom(buf)
        except timeout:
            f.close()
            s.close()
            print "File Downloaded"

for i in range(1300,1305):
    wrk = worker(i)
    threads.append(wrk)

Upvotes: 0

Views: 132

Answers (2)

Dalen
Dalen

Reputation: 4236

This will work as you intended except that it will overwrite the file a new each time new data is sent without timing out. Timeout represents end of whole connection. But you can easily rework this to add data to same file or create a new file or do whatever you need.


#! /usr/bin/env python
from socket import AF_INET, SOCK_DGRAM
import socket
import threading

class Server (threading.Thread):
    def __init__ (self, host="192.168.88.51", port=123, bufsize=128):
        threading.Thread.__init__(self)
        self.host = host
        self.port = port
        self.bufsize = bufsize
        self.done = threading.Event()

    def opensock (self):
        s = socket.socket(AF_INET, SOCK_DGRAM)
        s.bind((self.host, self.port))
        s.settimeout(0.001)
        return s

    def run (self):
        host = self.host
        port = self.port
        self.s = s = self.opensock()
        print "Waiting for connection on", host+":"+str(port)
        while not self.done.isSet():
            try:
                data, addr = s.recvfrom(self.bufsize)
                print "Connection from", addr
                s.settimeout(1)
                self.recvdata(data, s, addr)
                s.settimeout(0.001)
            except socket.timeout: pass
            except:
                raise
        self.done.set()
        s.close()
        print  "Server on '%s:%s' stopped!" % (host, port)

    def recvdata (self, initdata, conn, addr):
        bufsize = self.bufsize
        filename =  str(self.port)+".data"
        print "Opening file", filename
        f = open(filename, "wb")
        print "Receiving & writingrest of data from", addr
        data = initdata
        while data and not self.done.isSet():
            f.write(data)
            try:
                data, addr = conn.recvfrom(bufsize)
            except socket.timeout: break
        f.close()
        if self.done.isSet():
            print "Forcefully interrupted transmission"
        else:
            print "File Downloaded"

    def stop (self):
        self.done.set()
        self.s.close()

servers = []
for port in xrange(123, 150):
    try:
        s = Server(port=port)
        s.start()
        servers.append(s)
    except Exception as e:
        print e

raw_input("Press enter to send data to one of ports for testing . . . ")
import random
a = servers[0].host
p = random.choice(servers).port
print "data will be sent to port '%s:%i'" % (a, p)
k = socket.socket(AF_INET, SOCK_DGRAM)
k.connect((a, p))
k.send("1234567890")
k.send("asdfghjkl")
k.send("0987654321")
k.close()
raw_input("Press enter to close the program . . . ")

# Stop all servers:
for s in servers:
    s.stop()

# Make sure all of them terminated:
for s in servers:
    s.join()

Upvotes: 1

goetzmoritz
goetzmoritz

Reputation: 453

That did it. Figured it out myself.

#!/usr/bin/env python
from socket import *
import sys
import select
import multiprocessing

def worker(port):
        print "started: "+str(port)
        host="192.168.88.51"
        s = socket(AF_INET,SOCK_DGRAM)
        s.bind((host,port))
        addr = (host,port)
        buf=128
        data,addr = s.recvfrom(buf)
        filename =  str(port)+".jpg"
        print str(port)+" received File:"
        f = open(filename,'wb')

        data, addr = s.recvfrom(buf)
        try:
            while(data):
                f.write(data)
                s.settimeout(1)
                data,addr = s.recvfrom(buf)
        except timeout:
            f.close()
            s.close()
            print "File Downloaded"

for i in range(1300,1305):
    multiprocessing.Process(target=worker, args=(i,)).start()

Upvotes: 0

Related Questions