Reputation: 5836
I have to pass a data from my test cases to a mock server. What is the best way to do that ?
This is what I have so far
mock_server.py
class ThreadedUDPServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
pass
class ThreadedUDPRequestHandler(SocketServer.BaseRequestHandler):
def __init__(self, request, client_address, server):
SocketServer.BaseRequestHandler.__init__(self,request,client_address,server)
def handle(self):
print server.data #this is where i need the data
class server_wrap:
def __init__(self):
self.server = ThreadedUDPServer( ("127.0.0.1",49555) , ThreadedUDPRequestHandler)
def set_data(self,data)
self.server.data = data
def start(self)
server_thread = threading.Thread(target=self.server.serve_forever())
def stop(self)
self.server.shutdown()
test_mock.py
server_inst = server_wrap()
server_inst.start()
#code which sets the data and expects the handle method to print the data set
server_inst.stop()
The problem which i have with this code is, the execution stops at server_inst.start(), where the server goes in to an infinite listening mode
Other Solutions that I have tried, but failed:
Let me know about any other possible solutions. Thanks in advance
Update 1:
Using separate threads to send data to the socket:
Changes
test_mock.py
def test_set_data(data)
server_inst = server_wrap()
server_inst.set_data(data)
server_inst.start()
if __name__ == "__main__":
thread = Thread(target=test_set_data, args=("foo_data))
thread.setDaemon(True)
thread.start()
#test code which verifies if data set is same
#works so far, able to pass data
#problem starts now
thread = Thread(target=test_set_data, args=("bar_data))
thread.setDaemon(True)
thread.start()
#says address already in use error
#Tried calling server.shuddown() in handle , but error persists. Also there is no thread.shop in threading.Thread object
Thanks
Upvotes: 0
Views: 159
Reputation: 3658
The server should go to listening mode.
You don't need the server_inst.stop
until all the data was sent, and the test finishes. Maybe in you test tear down, or when the the test suite is completed.
To send data to the server, and let the handle pick it, you should open a socket on anohter thread. Then send the data to the server via this socket.
This code should look something like this:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("127.0.0.1",49555))
sock.send(... the data ...)
received = sock.recv(1024) # the handle can send a response
sock.close()
Add a function in your django code, which does run on another thread. This function will open the socket, connect, send the data and get the response. You can call it from a view, a middleware etc.
Upvotes: 1