Michael B
Michael B

Reputation: 101

Python - multithreaded sockets

From my understanding python can only run 1 thread at a time so if I were to do something like this

import socket, select
from threading import Thread
import config

class Source(Thread):
    def __init__(self):
        self._wait = False
        self._host = (config.HOST, config.PORT + 1)
        self._socket = socket.socket()
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._sock = None
        self._connections = []
        self._mount = "None"
        self._writers = []
        self._createServer()
        Thread.__init__(self)

    def _createServer(self):
        self._socket.bind(self._host)
        self._socket.listen(2)
        self._connections.append(self._socket)
        self._audioPackets=[]

    def _addPacket(self, packet):
        self._audioPackets.append(packet)

    def _removePacket(self, packet):
        self._audioPackets.remove(packet)

    def _getPacket(self):
        if len(self._audioPackets) > 0:
            return self._audioPackets[0]
        else:
            return None

    def _sendOK(self, sock):
        sock.send("OK")

    def _sendDenied(self, sock):
        sock.send("DENIED")

    def _sendMount(self, sock):
        sock.send("mount:{0}".format(self._mount))

    def _sendBufPacket(self, sock, packet):
        packet = "buffer:%s" % packet
        sock.send(packet)

    def recv(self, sock, data):
        data = data.split(":", 1)
        if data[0] == "WAIT": self._wait = True
        elif data[0] == "STOP_WAITING": self._wait = False
        elif data[0] == "LOGIN":
            if data[1] == config.SOURCE_AUTH:
                self._source = sock
                self._sendOK(sock)
            else:
                self._sendClose(sock)
        elif data[0] == "MOUNT":
            if self._source == sock:
                self._mount = data[1]
            else:
                self._sendClose(sock)

        elif data[0] == "CLIENT":
            self._sendMount(sock)
            self._writers.append(sock)


    def _sendCloseAll(self):
        for sock in self._connections:
            sock.send("CLOSE")
            sock.close()

    def _sendClose(self, sock):
        sock.send("CLOSE")
        sock.close()

    def main(self):
        while True:
            rl, wl, xl = select.select(self._connections, self._writers, [], 0.2)
            for sock in rl:
                if sock == self._socket:
                    con, ip = sock.accept()
                    self._connections.append(con)
                else:
                    data = sock.recv(config.BUFFER)
                    if data:
                        self.recv(sock, data)
                    else:
                        if sock in self._writers:
                            self._writers.remove(sock)
                        if sock in self._connections:
                            self._connections.remove(sock)
            for sock in wl:
                packet = self._getPacket()
                if packet != None:
                    self._sendBufPacket(sock, packet)

    def run(self):
        self.main()

class writeThread(Thread):
      def __init__(self):
          self.running = False

      def make(self, client):
          self.client = client
          self.running = True

      def run(self):
          host = (config.HOST, config.PORT+1)
          sock = socket.socket()
          sock.connect(host)
          sock.send("CLIENT")
          sock.send("MOUNT:mountpoint")
          while self.running:
              data = sock.recv(config.BUFFER)
              if data:
                  data = data.split(":", 1)
                  if data[0] == "buffer":
                     self.client.send(data[1])
                  elif data[0] == "CLOSE":
                       self.client.close()
                       break


if __name__=="__main__":
    source = Source()
    source.start()
    webserver = WebServer()
    webserver.runloop()

if I need to build the webserver part I will. But, I'll explain it. Okay, so basically when someone connects to the websever under the mountpoint that was set, They will get there own personal thread that then grabs the data from Source() and sends it to them. Now say another person connects to the mount point and the last client as well as the source is still going. Wouldn't the new client be blocked from getting the Source data considering there are two active threads?

Upvotes: 2

Views: 1646

Answers (2)

jberry
jberry

Reputation: 1

Okay, I have copy and pasted some Python3 code that I have already written for a project that I am currently working on. With modification, you can make this code serve your purposes.

The code uses multiprocessing and multithreading. For my purposes, I am using multiprocessing so that sockets will run on one processor, and I can run a GUI program on another processor. You can remove the multiprocessor part if you prefer. The code below runs a socket message server. The server will listen for clients one at a time. Once a client has connected, a new thread will be initiated to handle all the communications between the server and each client. The server will then continue to search for for clients. At the moment however, the server only listens to data being sent from each client, and then it prints it to the terminal. With a small amount of effort, you can modify my code to send information from the server to each client individually.

import multiprocessing
import threading
from threading import Thread

class ThreadedServer(object):

    def __init__(self, host, port):
    self.host = host
    self.port = port
    self.sock = socket(AF_INET, SOCK_STREAM)
    self.sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    self.sock.bind((self.host, self.port))

    def listen(self):    
        self.sock.listen(3) #Allow 3 Clients to connect to this server
        while True:
            #The program will search for one client at a time
            print("Searching for Client")
            client, address = self.sock.accept()
            print(address, " is connected")
            #client.settimeout(60)

            #Once a client has been found, start a individual client thread
            d = threading.Thread(target = self.listenToClient, args=(client, address))
            d.daemon = True
            d.start()

def listenToClient(self, client, address):
    size = 1024
    while True:
        try:
            data = client.recv(size)
            if not data:
                break
            if data:
                print(data)
                #client.send(response)
            else:
                raise error('Client disconnected')
        except:
            client.close()
            return False

def dataSharingHost():
    #Using Sockets to send information between Processes
    #This is the server Function
    #ThreadServer(Host_IP, Port_Number), for LocalHost use '' 
    ThreadedServer('', 8000).listen() 

def Main():
    commServer = multiprocessing.Process(target=dataSharingHost, args=())
    commServer.daemon = True
    commServer.start()

if __name__== '__main__':
    Main()

And to be fair, my code is modified from https://www.youtube.com/watch?v=qELZAi4yra8 . The client code is covered in those videos. I think the 3rd video covers the multiple client connects.

Upvotes: 0

Cyb3rFly3r
Cyb3rFly3r

Reputation: 1341

Your understanding of how Threads work in Python seems to be incorrect, based on the question you are asking. If used correctly, threads will not be blocking: you can instantiate multiple thread with Python. The limitation is that, due to the Global Interpreter Lock (GIL), you cannot get the full parallelism expected in thread programming (e.g. simultaneous execution and thus, reduced runtime). What is going to happen in your case is that the two threads will take, together, the same amount of time that they would take if they were executed sequentially (although that is not necessarily what happens in practice).

Upvotes: 2

Related Questions