Reputation: 2289
I like to generate a in-memory (temp file) data stream in Python. One thread is filling the stream with data, and a other one consumes it.
After checking the io - Core tools for working with streams , it seems to me that the io
module is the best choice for it.
So I put a simple example for me:
#!/usr/local/bin/python3
# encoding: utf-8
import io
if __name__ == '__main__':
a = io.BytesIO()
a.write("hello".encode())
txt = a.read(100)
txt = txt.decode("utf-8")
print(txt)
My example does not work. "hello"
is not written to a and can not be read after. So were is my error? How do I have to alter my code to get a file like object in memory?
Upvotes: 5
Views: 3157
Reputation: 2289
@dhilmathy and @ShadowRanger mentioned that io.BytesIO()
do not have separate read and write pointer.
I overcome this problem with creating a simple class that implements a read pointer and remembers the amount of bytes written. When the amount of read bytes is equal the amount of written bytes the file is shrink to save memory.
My solution so far:
#!/usr/local/bin/python3
# encoding: utf-8
import io
class memoryStreamIO(io.BytesIO):
"""
memoryStreamIO
a in memory file like stream object
"""
def __init__(self):
super().__init__()
self._wIndex = 0
self._rIndex = 0
self._mutex = threading.Lock()
def write(self, d : bytearray):
self._mutex.acquire()
r = super().write(d)
self._wIndex += len(d)
self._mutex.release()
return r
def read(self, n : int):
self._mutex.acquire()
super().seek(self._rIndex)
r = super().read(n)
self._rIndex += len(r)
# now we are checking if we can
if self._rIndex == self._wIndex:
super().truncate(0)
super().seek(0)
self._rIndex = 0
self._wIndex = 0
self._mutex.release()
return r
def seek(self, n):
self._mutex.acquire()
self._rIndex = n
r = super().seek(n)
self._mutex.release()
return r
if __name__ == '__main__':
a = streamIO()
a.write("hello".encode())
txt = (a.read(100)).decode()
print(txt)
a.write("abc".encode())
txt = (a.read(100)).decode()
print(txt)
Upvotes: 3
Reputation: 2868
Actually it's written; but reading is the problem. You should be referring to class io.BytesIO. You can get the value using getvalue()
. Like,
import io
a = io.BytesIO()
a.write("hello".encode())
txt = a.getvalue()
txt = txt.decode("utf-8")
print(txt)
Upvotes: 3