JS.
JS.

Reputation: 16097

Reading lines from Python queues

I'm dipping my toes into Python threading. I've created a supplier thread that returns me character/line data from a *nix (serial) /dev via a Queue.

As an exercise, I would like to consume the data from the queue one line at a time (using '\n' as the line terminator).

My current (simplistic) solution is to put() only 1 character at a time into the queue, so the consumer will only get() one character at a time. (Is this a safe assumption?) This approach currently allows me to do the following:

...
return_buffer = []
while True:
    rcv_data = queue.get(block=True)
    return_buffer.append(rcv_data)        
    if rcv_data == "\n":
        return return_buffer

This seems to be working, but I can definitely cause it to fail when I put() 2 characters at a time.

I would like to make the receive logic more generic and able to handle multi-character put()s.

My next approach would be to rcv_data.partition("\n"), putting the "remainder" in yet another buffer/list, but that will require juggling the temporary buffer alongside the queue. (I guess another approach would be to only put() one line at a time, but where's the fun in that?)

Is there a more elegant way to read from a queue one line at a time?

Upvotes: 4

Views: 3299

Answers (4)

blented
blented

Reputation: 2797

It's important to note that there could be multiple lines in the queue. This function will return (and optionally print) all the lines from a given queue:

def getQueueContents(queue, printContents=True):
    contents = ''
    # get the full queue contents, not just a single line
    while not queue.empty():
        line = queue.get_nowait()
        contents += line
        if printContents:
            # remove the newline at the end
            print line[:-1]
    return contents

Upvotes: 1

grieve
grieve

Reputation: 13508

This may be a good use for a generator. It will pick up exactly where it left off after yield, so reduces the amount of storage and buffer swapping you need (I cannot speak to its performance).

def getLineGenerator(queue, splitOn):
    return_buffer = []
    while True:
        rcv_data = queue.get(block=True) # We can pull any number of characters here.
        for c in rcv_data:
            return_buffer.append(c)
            if c == splitOn:
                yield return_buffer
                return_buffer = []


gen = getLineGenerator(myQueue, "\n")
for line in gen:
    print line.strip()

Edit:

Once J.F. Sebastian pointed out that the line separator could be multi-character I had to solve that case as well. I also used StringIO from jdi's answer. Again I cannot speak to the efficiency, but I believe it is correct in all cases (at least the ones I could think of). This is untested, so would probably need some tweaks to actually run. Thanks to J.F. Sebastian and jdi for their answers which ultimately lead to this one.

def getlines(chunks, splitOn="\n"):
    r_buffer = StringIO()
    for chunk in chunks
        r_buffer.write(chunk)
        pos = r_buffer.getvalue().find(splitOn) # can't use rfind see the next comment
        while pos != -1: # A single chunk may have more than one separator
            line = r_buffer.getvalue()[:pos + len(splitOn)]
            yield line
            rest = r_buffer.getvalue().split(splitOn, 1)[1]
            r_buffer.seek(0)
            r_buffer.truncate()
            r_buffer.write(rest)
            pos = rest.find(splitOn) # rest and r_buffer are equivalent at this point. Use rest to avoid an extra call to getvalue

    line = r_buffer.getvalue();
    r_buffer.close() # just for completeness
    yield line # whatever is left over.

for line in getlines(iter(queue.get, None)): # break on queue.put(None)
    process(line)

Upvotes: 3

jfs
jfs

Reputation: 414179

The queue returns exactly what you put in it. If you put fragments you get fragments. If you put lines you get lines.

To consume line by line if partial lines in the input are allowed and could be completed later you need a buffer either explicit or implicit to store partial lines:

def getlines(fragments, linesep='\n'):
    buff = []
    for fragment in fragments:
        pos = fragment.rfind(linesep)
        if pos != -1: # linesep in fragment
           lines = fragment[:pos].split(linesep)
           if buff: # start of line from previous fragment
              line[0] = ''.join(buff) + line[0] # prepend
              del buff[:] # clear buffer
           rest = fragment[pos+len(linesep):]
           if rest:
              buff.append(rest)
           yield from lines
        elif fragment: # linesep not in fragment, fragment is not empty
           buff.append(fragment)

    if buff:
       yield ''.join(buff) # flush the rest

It allows fragments, linesep of arbitrary length. linesep should not span several fragments.

Usage:

for line in getlines(iter(queue.get, None)): # break on queue.put(None)
    process(line)

Upvotes: 1

jdi
jdi

Reputation: 92569

If your specific use-case producer needs to put to the queue character by character, then I suppose I can't see anything wrong with getting them in a loop in the consumer. But you can probably get better performance by using a StringIO object as the buffer.

from cStringIO import StringIO
# python3: from io import StringIO

buf = StringIO()

The object if file-like, so you can write to it, seek it, and call getvalue() at any time to get the complete string value in the buffer. This will most likely give you much better performance than having to constantly grow a list, join it to a string, and clear it.

return_buffer = StringIO()
while True:
    rcv_data = queue.get(block=True)
    return_buffer.write(rcv_data)        
    if rcv_data == "\n":
        ret = return_buffer.getvalue()
        return_buffer.seek(0)
        # truncate, unless you are counting bytes and
        # reading the data directly each time
        return_buffer.truncate()

        return ret

Upvotes: 2

Related Questions