Reputation: 98941
What is the best approach to process a socket connection where I need var data
to end with a line break \n
?
I'm using the code below but sometimes the tcp
packets get chunked and it takes a long time to match data.endswith("\n")
.
I've also tried other approaches, like saving the last line if it doesn't end with \n
and append it to data
on the next loop. but this also doesn't work because multiple packets get chunked and the 1st and 2nd part don't match.
I've no control over the other end, it basically sends multiple lines that end in \r\n
.
Any suggestion will be welcome, as I don't have much knowledge on socket connections.
def receive_bar_updates(s):
global all_bars
data = ''
buffer_size = 4096
while True:
data += s.recv(buffer_size)
if not data.endswith("\n"):
continue
lines = data.split("\n")
lines = filter(None, lines)
for line in lines:
if line.startswith("BH") or line.startswith("BC"):
symbol = str(line.split(",")[1])
all_bars[symbol].append(line)
y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
y.start()
data = ""
Example of "normal" data
:
line1\r\n
line2\r\n
line3\r\n
Example of chunked data
:
line1\r\n
line2\r\n
lin
Upvotes: 3
Views: 19310
Reputation: 1572
Use socket.socket.makefile() to wrap the socket in a class that implenents Text I/O. It handles buffering, converting between bytes and strings, and lets you iterate over lines. Remember to flush any writes.
Example:
#!/usr/bin/env python3
import socket, threading, time
def client(addr):
with socket.create_connection(addr) as conn:
conn.sendall(b'aaa')
time.sleep(1)
conn.sendall(b'bbb\n')
time.sleep(1)
conn.sendall(b'cccddd\n')
time.sleep(1)
conn.sendall(b'eeefff')
time.sleep(1)
conn.sendall(b'\n')
conn.shutdown(socket.SHUT_WR)
response = conn.recv(1024)
print('client got %r' % (response,))
def main():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as listen_socket:
listen_socket.bind(('localhost', 0))
listen_socket.listen(1)
addr = listen_socket.getsockname()
threading.Thread(target=client, args=(addr,)).start()
conn, _addr = listen_socket.accept()
conn_file = conn.makefile(mode='rw', encoding='utf-8')
for request in conn_file:
print('server got %r' % (request,))
conn_file.write('response1\n')
conn_file.flush()
if __name__ == '__main__':
main()
$ ./example.py
server got 'aaabbb\n'
server got 'cccddd\n'
server got 'eeefff\n'
client got b'response1\n'
$
Upvotes: 5
Reputation: 31
I have not tested this code, but it should work:
def receive_bar_updates(s):
global all_bars
data = ''
buf = ''
buffer_size = 4096
while True:
if not "\r\n" in data: # skip recv if we already have another line buffered.
data += s.recv(buffer_size)
if not "\r\n" in data:
continue
i = data.rfind("\r\n")
data, buf = data[:i+2], data[i+2:]
lines = data.split("\r\n")
lines = filter(None, lines)
for line in lines:
if line.startswith("BH") or line.startswith("BC"):
symbol = str(line.split(",")[1])
all_bars[symbol].append(line)
y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
y.start()
data = buf
Edit: Forgot to mention, i only modified the code for receiving the data, i have no idea what the rest of the function (starting with lines = data.split("\n")
) is for.
Edit 2: Now uses "\r\n" for linebreaks instead of "\n".
Edit 3: Fixed an issue.
Upvotes: 0
Reputation: 173
Are you accepting different connections? Or is it one stream of data, split up by \r\n
's?
When accepting multiple connections you'd wait for a connection with s.accept()
and then process all its data. When you have all of the packet, process its data, and wait for the next connection.
What you do then depends on what the structure of each packet would be.
(Example: https://wiki.python.org/moin/TcpCommunication)
If instead you are consuming a stream of data, you should probably process each 'line' you find in a separate thread, while you keep consuming on another.
Edit:
So, if I have your situation correct; one connection, the data being a string broken up by \r\n
, ending with a \n
. The data however does not correspond to what you are expecting, instead looping infinitely waiting for a \n
.
The socket interface, as I understand it, ends with an empty data result. So the last buffer might have ended with a \n
, but then just continued getting None
objects, trying to find another \n
.
Instead, try adding this:
if not data:
break
Full code:
def receive_bar_updates(s):
global all_bars
data = ''
buffer_size = 4096
while True:
data += s.recv(buffer_size)
if not data:
break
if not data.endswith("\n"):
continue
lines = data.split("\n")
lines = filter(None, lines)
for line in lines:
if line.startswith("BH") or line.startswith("BC"):
symbol = str(line.split(",")[1])
all_bars[symbol].append(line)
y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
y.start()
data = ""
Edit2: Oops, wrong code
Upvotes: 0
Reputation: 148975
If you have a raw input that you want to process as line, the io module is your friend because it will do the low level assembling of packets in lines.
You could use:
class SocketIO(io.RawIOBase):
def __init__(self, sock):
self.sock = sock
def read(self, sz=-1):
if (sz == -1): sz=0x7FFFFFFF
return self.sock.recv(sz)
def seekable(self):
return False
It is more robust than endswith('\n')
because if one packet contains an embedded newline ('ab\ncd'
), the io module will correctly process it. Your code could become:
def receive_bar_updates(s):
global all_bars
data = ''
buffer_size = 4096
fd = SocketIO(s) # fd can be used as an input file object
for line in fd:
if should_be_rejected_by_filter(line): continue # do not know what filter does...
if line.startswith("BH") or line.startswith("BC"):
symbol = str(line.split(",")[1])
all_bars[symbol].append(line)
y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
y.start()
Upvotes: 6
Reputation: 2847
You basically seem to want to read lines from the socket. Maybe you're better off not using low level recv
calls but just use sock.makefile()
and treat the result as a regular file where you can read lines from: from line in sfile: ...
That leaves the delay/chunk issue. This is likely to be caused by Nagle's algorithm on the sending side. Try disabling that:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
Upvotes: -1