Pedro Lobito
Pedro Lobito

Reputation: 98941

process socket data that ends with a line break

What is the best approach to process a socket connection where I need var data to end with a line break \n? I'm using the code below but sometimes the tcp packets get chunked and it takes a long time to match data.endswith("\n"). I've also tried other approaches, like saving the last line if it doesn't end with \n and append it to dataon the next loop. but this also doesn't work because multiple packets get chunked and the 1st and 2nd part don't match. I've no control over the other end, it basically sends multiple lines that end in \r\n.

Any suggestion will be welcome, as I don't have much knowledge on socket connections.

def receive_bar_updates(s):
    global all_bars
    data = ''
    buffer_size = 4096
    while True:
        data += s.recv(buffer_size)
        if not data.endswith("\n"):
            continue
        lines = data.split("\n")
        lines = filter(None, lines)
        for line in lines:
            if line.startswith("BH") or line.startswith("BC"):
                symbol = str(line.split(",")[1])
                all_bars[symbol].append(line)
                y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
                y.start()
        data = ""

Example of "normal" data:

line1\r\n
line2\r\n
line3\r\n

Example of chunked data:

line1\r\n
line2\r\n
lin

Upvotes: 3

Views: 19310

Answers (5)

M. Leonhard
M. Leonhard

Reputation: 1572

Use socket.socket.makefile() to wrap the socket in a class that implenents Text I/O. It handles buffering, converting between bytes and strings, and lets you iterate over lines. Remember to flush any writes.

Example:

#!/usr/bin/env python3
import socket, threading, time


def client(addr):
    with socket.create_connection(addr) as conn:
        conn.sendall(b'aaa')
        time.sleep(1)
        conn.sendall(b'bbb\n')
        time.sleep(1)
        conn.sendall(b'cccddd\n')
        time.sleep(1)
        conn.sendall(b'eeefff')
        time.sleep(1)
        conn.sendall(b'\n')
        conn.shutdown(socket.SHUT_WR)
        response = conn.recv(1024)
        print('client got %r' % (response,))


def main():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as listen_socket:
        listen_socket.bind(('localhost', 0))
        listen_socket.listen(1)
        addr = listen_socket.getsockname()
        threading.Thread(target=client, args=(addr,)).start()
        conn, _addr = listen_socket.accept()
        conn_file = conn.makefile(mode='rw', encoding='utf-8')
        for request in conn_file:
            print('server got %r' % (request,))
        conn_file.write('response1\n')
        conn_file.flush()


if __name__ == '__main__':
    main()
$ ./example.py
server got 'aaabbb\n'
server got 'cccddd\n'
server got 'eeefff\n'
client got b'response1\n'
$

Upvotes: 5

bennr01
bennr01

Reputation: 31

I have not tested this code, but it should work:

def receive_bar_updates(s):
    global all_bars
    data = ''
    buf = ''
    buffer_size = 4096
    while True:
        if not "\r\n" in data:  # skip recv if we already have another line buffered.
            data += s.recv(buffer_size)
        if not "\r\n" in data:
            continue
        i = data.rfind("\r\n")
        data, buf = data[:i+2], data[i+2:]
        lines = data.split("\r\n")
        lines = filter(None, lines)
        for line in lines:
            if line.startswith("BH") or line.startswith("BC"):
                symbol = str(line.split(",")[1])
                all_bars[symbol].append(line)
                y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
                y.start()
        data = buf

Edit: Forgot to mention, i only modified the code for receiving the data, i have no idea what the rest of the function (starting with lines = data.split("\n")) is for.

Edit 2: Now uses "\r\n" for linebreaks instead of "\n".

Edit 3: Fixed an issue.

Upvotes: 0

Bushman
Bushman

Reputation: 173

Are you accepting different connections? Or is it one stream of data, split up by \r\n's?

When accepting multiple connections you'd wait for a connection with s.accept() and then process all its data. When you have all of the packet, process its data, and wait for the next connection. What you do then depends on what the structure of each packet would be. (Example: https://wiki.python.org/moin/TcpCommunication)

If instead you are consuming a stream of data, you should probably process each 'line' you find in a separate thread, while you keep consuming on another.

Edit: So, if I have your situation correct; one connection, the data being a string broken up by \r\n, ending with a \n. The data however does not correspond to what you are expecting, instead looping infinitely waiting for a \n.

The socket interface, as I understand it, ends with an empty data result. So the last buffer might have ended with a \n, but then just continued getting None objects, trying to find another \n.

Instead, try adding this:

if not data:
    break

Full code:

def receive_bar_updates(s):
    global all_bars
    data = ''
    buffer_size = 4096
    while True:
        data += s.recv(buffer_size)
        if not data:
            break
        if not data.endswith("\n"):
            continue
        lines = data.split("\n")
        lines = filter(None, lines)
        for line in lines:
            if line.startswith("BH") or line.startswith("BC"):
                symbol = str(line.split(",")[1])
                all_bars[symbol].append(line)
                y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
                y.start()
        data = ""

Edit2: Oops, wrong code

Upvotes: 0

Serge Ballesta
Serge Ballesta

Reputation: 148975

If you have a raw input that you want to process as line, the io module is your friend because it will do the low level assembling of packets in lines.

You could use:

class SocketIO(io.RawIOBase):
    def __init__(self, sock):
        self.sock = sock
    def read(self, sz=-1):
        if (sz == -1): sz=0x7FFFFFFF
        return self.sock.recv(sz)
    def seekable(self):
        return False

It is more robust than endswith('\n') because if one packet contains an embedded newline ('ab\ncd'), the io module will correctly process it. Your code could become:

def receive_bar_updates(s):
    global all_bars
    data = ''
    buffer_size = 4096
    fd = SocketIO(s)  # fd can be used as an input file object

    for line in fd:
        if should_be_rejected_by_filter(line): continue # do not know what filter does...
        if line.startswith("BH") or line.startswith("BC"):
            symbol = str(line.split(",")[1])
            all_bars[symbol].append(line)
            y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
            y.start()

Upvotes: 6

Irmen de Jong
Irmen de Jong

Reputation: 2847

You basically seem to want to read lines from the socket. Maybe you're better off not using low level recv calls but just use sock.makefile() and treat the result as a regular file where you can read lines from: from line in sfile: ...

That leaves the delay/chunk issue. This is likely to be caused by Nagle's algorithm on the sending side. Try disabling that:

sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

Upvotes: -1

Related Questions