Barath Ravikumar
Barath Ravikumar

Reputation: 5836

Pass data asynchronously to a Python Server class

I have to pass a data from my test cases to a mock server. What is the best way to do that ?

This is what I have so far

mock_server.py

class ThreadedUDPServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
    pass

class ThreadedUDPRequestHandler(SocketServer.BaseRequestHandler):
    def __init__(self, request, client_address, server):
        SocketServer.BaseRequestHandler.__init__(self,request,client_address,server)

    def handle(self):
        print server.data #this is where i need the data 

class server_wrap:
     def __init__(self):
        self.server = ThreadedUDPServer( ("127.0.0.1",49555) , ThreadedUDPRequestHandler)

     def set_data(self,data)
        self.server.data = data

     def start(self)
        server_thread = threading.Thread(target=self.server.serve_forever())

     def stop(self)
        self.server.shutdown()

test_mock.py

server_inst = server_wrap()
server_inst.start()
#code which sets the data and expects the handle method to print the data set
server_inst.stop()

The problem which i have with this code is, the execution stops at server_inst.start(), where the server goes in to an infinite listening mode

Other Solutions that I have tried, but failed:

Let me know about any other possible solutions. Thanks in advance

Update 1:

Using separate threads to send data to the socket:

Changes

test_mock.py

def test_set_data(data)
    server_inst = server_wrap()
    server_inst.set_data(data)
    server_inst.start()

if __name__ == "__main__":
    thread = Thread(target=test_set_data, args=("foo_data))
    thread.setDaemon(True)
    thread.start()
    #test code which verifies if data set is same
    #works so far, able to pass data

    #problem starts now
    thread = Thread(target=test_set_data, args=("bar_data))
    thread.setDaemon(True)
    thread.start()
    #says address already in use error
    #Tried calling server.shuddown() in handle , but error persists. Also there is no thread.shop in threading.Thread object

Thanks

Upvotes: 0

Views: 159

Answers (1)

Aviah Laor
Aviah Laor

Reputation: 3658

The server should go to listening mode.

You don't need the server_inst.stop until all the data was sent, and the test finishes. Maybe in you test tear down, or when the the test suite is completed.

To send data to the server, and let the handle pick it, you should open a socket on anohter thread. Then send the data to the server via this socket.

This code should look something like this:

import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    
sock.connect(("127.0.0.1",49555))
sock.send(... the data ...)
received = sock.recv(1024) # the handle can send a response
sock.close()

Add a function in your django code, which does run on another thread. This function will open the socket, connect, send the data and get the response. You can call it from a view, a middleware etc.

Upvotes: 1

Related Questions