Reputation: 16097
I'm dipping my toes into Python threading. I've created a supplier thread that returns me character/line data from a *nix (serial) /dev via a Queue.
As an exercise, I would like to consume the data from the queue one line at a time (using '\n' as the line terminator).
My current (simplistic) solution is to put() only 1 character at a time into the queue, so the consumer will only get() one character at a time. (Is this a safe assumption?) This approach currently allows me to do the following:
...
return_buffer = []
while True:
rcv_data = queue.get(block=True)
return_buffer.append(rcv_data)
if rcv_data == "\n":
return return_buffer
This seems to be working, but I can definitely cause it to fail when I put() 2 characters at a time.
I would like to make the receive logic more generic and able to handle multi-character put()s.
My next approach would be to rcv_data.partition("\n"), putting the "remainder" in yet another buffer/list, but that will require juggling the temporary buffer alongside the queue. (I guess another approach would be to only put() one line at a time, but where's the fun in that?)
Is there a more elegant way to read from a queue one line at a time?
Upvotes: 4
Views: 3299
Reputation: 2797
It's important to note that there could be multiple lines in the queue. This function will return (and optionally print) all the lines from a given queue:
def getQueueContents(queue, printContents=True):
contents = ''
# get the full queue contents, not just a single line
while not queue.empty():
line = queue.get_nowait()
contents += line
if printContents:
# remove the newline at the end
print line[:-1]
return contents
Upvotes: 1
Reputation: 13508
This may be a good use for a generator. It will pick up exactly where it left off after yield, so reduces the amount of storage and buffer swapping you need (I cannot speak to its performance).
def getLineGenerator(queue, splitOn):
return_buffer = []
while True:
rcv_data = queue.get(block=True) # We can pull any number of characters here.
for c in rcv_data:
return_buffer.append(c)
if c == splitOn:
yield return_buffer
return_buffer = []
gen = getLineGenerator(myQueue, "\n")
for line in gen:
print line.strip()
Edit:
Once J.F. Sebastian pointed out that the line separator could be multi-character I had to solve that case as well. I also used StringIO from jdi's answer. Again I cannot speak to the efficiency, but I believe it is correct in all cases (at least the ones I could think of). This is untested, so would probably need some tweaks to actually run. Thanks to J.F. Sebastian and jdi for their answers which ultimately lead to this one.
def getlines(chunks, splitOn="\n"):
r_buffer = StringIO()
for chunk in chunks
r_buffer.write(chunk)
pos = r_buffer.getvalue().find(splitOn) # can't use rfind see the next comment
while pos != -1: # A single chunk may have more than one separator
line = r_buffer.getvalue()[:pos + len(splitOn)]
yield line
rest = r_buffer.getvalue().split(splitOn, 1)[1]
r_buffer.seek(0)
r_buffer.truncate()
r_buffer.write(rest)
pos = rest.find(splitOn) # rest and r_buffer are equivalent at this point. Use rest to avoid an extra call to getvalue
line = r_buffer.getvalue();
r_buffer.close() # just for completeness
yield line # whatever is left over.
for line in getlines(iter(queue.get, None)): # break on queue.put(None)
process(line)
Upvotes: 3
Reputation: 414179
The queue returns exactly what you put in it. If you put fragments you get fragments. If you put lines you get lines.
To consume line by line if partial lines in the input are allowed and could be completed later you need a buffer either explicit or implicit to store partial lines:
def getlines(fragments, linesep='\n'):
buff = []
for fragment in fragments:
pos = fragment.rfind(linesep)
if pos != -1: # linesep in fragment
lines = fragment[:pos].split(linesep)
if buff: # start of line from previous fragment
line[0] = ''.join(buff) + line[0] # prepend
del buff[:] # clear buffer
rest = fragment[pos+len(linesep):]
if rest:
buff.append(rest)
yield from lines
elif fragment: # linesep not in fragment, fragment is not empty
buff.append(fragment)
if buff:
yield ''.join(buff) # flush the rest
It allows fragments, linesep of arbitrary length. linesep should not span several fragments.
Usage:
for line in getlines(iter(queue.get, None)): # break on queue.put(None)
process(line)
Upvotes: 1
Reputation: 92569
If your specific use-case producer needs to put to the queue character by character, then I suppose I can't see anything wrong with getting them in a loop in the consumer. But you can probably get better performance by using a StringIO
object as the buffer.
from cStringIO import StringIO
# python3: from io import StringIO
buf = StringIO()
The object if file-like, so you can write
to it, seek it, and call getvalue()
at any time to get the complete string value in the buffer. This will most likely give you much better performance than having to constantly grow a list, join it to a string, and clear it.
return_buffer = StringIO()
while True:
rcv_data = queue.get(block=True)
return_buffer.write(rcv_data)
if rcv_data == "\n":
ret = return_buffer.getvalue()
return_buffer.seek(0)
# truncate, unless you are counting bytes and
# reading the data directly each time
return_buffer.truncate()
return ret
Upvotes: 2