Reputation: 3567
I am currently using this lib to stress test a kafka server that I have set up: https://github.com/dsully/pykafka
import kafka
import time
def test_kafka_server(n=1):
for i in range(0,n):
producer = kafka.producer.Producer('test',host='10.137.8.192')
message = kafka.message.Message(str(time.time()))
producer.send(message)
producer.disconnect()
def main():
test_kafka_server(100000)
if __name__ == '__main__':
main()
What just ends up happening is that I end up overloading my own local machine.
I get error 10055, which according to google means that "Windows has run out of TCP/IP socket buffers because too many connections are open at once." According to netstat, producer.disconnect() is not closing the socket, but rather putting it in a TIME_WAIT
state.
The ipython debugger points to this line:
C:\Python27\lib\socket.pyc in meth(name, self, *args)
222 proto = property(lambda self: self._sock.proto, doc="the socket protocol")
223
--> 224 def meth(name,self,*args):
225 return getattr(self._sock,name)(*args)
226
as the culprit, but this then seems to get into messing with things at a lower level than I am comfortable with.
I had searched and found this Python socket doesn't close connection properly which recommended doing:
setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
so, I rebuilt the pykafka lib using that option in the io.py file:
def connect(self):
""" Connect to the Kafka server. """
global socket
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.connect((self.host, self.port))
and I still get the same error.
Am I not putting the setsockopt line in the right spot? Is there anything else I could be trying?
Upvotes: 5
Views: 3744
Reputation: 43034
What you are describing is normal TCP behavior at the socket level. When a user level program closes a socket the kernel does not free the socket right away. It enters TIME_WAIT state:
TIME-WAIT (either server or client) represents waiting for enough time to pass to be sure the remote TCP received the acknowledgment of its connection termination request. [According to RFC 793 a connection can stay in TIME-WAIT for a maximum of four minutes known as a MSL (maximum segment lifetime).
So the socket is closed. The socket.SO_REUSEADDR is for listeners (servers), doesn't effect client connections. Well, really used when binding the socket.
Upvotes: 6