Py42
Py42

Reputation: 91

python read file non blocking on windows

I have a programm on windows (Win7) that writes to a txt file every x seconds. Now I have a python script that reads this txt File every x seconds. When the python script reads the File and at the same time the other Program wants to write to that file - The writing program crashes (and displays permission error). Since I cant modify the way the Program writes to the txt file, I have to try to open the txt File without blocking the writing program. Does someone know what I could do in this situation (reading without blocking) I would be very happy for every tip on this topic!

The code for the Program that tries to read the File goes something like this:

    with codecs.open(datapath, "r", 'utf-16') as raw_data:

         raw_data_x = raw_data.readlines()

I have to open the File with "codecs" because its in unicode.

Upvotes: 9

Views: 4251

Answers (2)

lego king
lego king

Reputation: 638

Recently I had to do I/O reads on stdin,stdout in python with cross-platform compatibility.
For linux:
for linux we can use select module. It is wrapper implementation of posix select function. It allows you to pass multiple file descriptors, wait for them to get ready. Once they get ready you will be notified and read/write action can be performed. Here is little code that will get you an idea
here nodejs is a docker environment with nodejs image

  stdin_buf = BytesIO(json.dumps(fn) + "\n")
  stdout_buf = BytesIO()
  stderr_buf = BytesIO()

  rselect = [nodejs.stdout, nodejs.stderr]  # type: List[BytesIO]
  wselect = [nodejs.stdin]  # type: List[BytesIO]
  while (len(wselect) + len(rselect)) > 0:
            rready, wready, _ = select.select(rselect, wselect, [])
            try:
                if nodejs.stdin in wready:
                    b = stdin_buf.read(select.PIPE_BUF)
                    if b:
                        os.write(nodejs.stdin.fileno(), b)
                    else:
                        wselect = []
                for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)):
                    if pipes[0] in rready:
                        b = os.read(pipes[0].fileno(), select.PIPE_BUF)
                        if b:
                            pipes[1].write(b)
                        else:
                            rselect.remove(pipes[0])
                if stdout_buf.getvalue().endswith("\n"):
                    rselect = []
            except OSError as e:
                break  

For windows This code example has both read and write operations with stdin,stdout involved. Now this code wouldn't work with windows OS because on windows select implelentation does not allow stdin, stdout to be passed as arguments.
Docs says:

File objects on Windows are not acceptable, but sockets are. On Windows, the underlying select() function is provided by the WinSock library, and does not handle file descriptors that don’t originate from WinSock.

First I must mention that there are lot of libraries for non-blocking I/O read on windows like asyncio(python 3), gevent(for python 2.7) , msvcrt and then there is pywin32's win32event that alert you if your socket is ready to read/write data. But none of them allowed me to read write on stdin/stdout giving errros like
An operation is performend on something that is not a socket
Handles only expect integer values etc.
There are some other libraries like twister that I haven't tried.

Now, To implement a functionality like in above mentioned code on windows platform I used threads. here is my code:

    stdin_buf = BytesIO(json.dumps(fn) + "\n")
    stdout_buf = BytesIO()
    stderr_buf = BytesIO()

    rselect = [nodejs.stdout, nodejs.stderr]  # type: List[BytesIO]
    wselect = [nodejs.stdin]  # type: List[BytesIO]
    READ_BYTES_SIZE = 512

    # creating queue for reading from a thread to queue
    input_queue = Queue.Queue()
    output_queue = Queue.Queue()
    error_queue = Queue.Queue()

    # To tell threads that output has ended and threads can safely exit
    no_more_output = threading.Lock()
    no_more_output.acquire()
    no_more_error = threading.Lock()
    no_more_error.acquire()

    # put constructed command to input queue which then will be passed to nodejs's stdin
    def put_input(input_queue):
        while True:
            sys.stdout.flush()
            b = stdin_buf.read(READ_BYTES_SIZE)
            if b:
                input_queue.put(b)
            else:
                break

    # get the output from nodejs's stdout and continue till otuput ends
    def get_output(output_queue):
        while not no_more_output.acquire(False):
            b=os.read(nodejs.stdout.fileno(), READ_BYTES_SIZE)
            if b:
                output_queue.put(b)

    # get the output from nodejs's stderr and continue till error output ends
    def get_error(error_queue):
        while not no_more_error.acquire(False):
            b = os.read(nodejs.stderr.fileno(), READ_BYTES_SIZE)
            if b:
                error_queue.put(b)

    # Threads managing nodejs.stdin, nodejs.stdout and nodejs.stderr respectively
    input_thread = threading.Thread(target=put_input, args=(input_queue,))
    input_thread.start()
    output_thread = threading.Thread(target=get_output, args=(output_queue,))
    output_thread.start()
    error_thread = threading.Thread(target=get_error, args=(error_queue,))
    error_thread.start()

    # mark if output/error is ready
    output_ready=False
    error_ready=False

    while (len(wselect) + len(rselect)) > 0:
        try:
            if nodejs.stdin in wselect:
                if not input_queue.empty():
                    os.write(nodejs.stdin.fileno(), input_queue.get())
                elif not input_thread.is_alive():
                    wselect = []
            if nodejs.stdout in rselect:
                if not output_queue.empty():
                    output_ready = True
                    stdout_buf.write(output_queue.get())
                elif output_ready:
                    rselect = []
                    no_more_output.release()
                    no_more_error.release()
                    output_thread.join()

            if nodejs.stderr in rselect:
                if not error_queue.empty():
                    error_ready = True
                    stderr_buf.write(error_queue.get())
                elif error_ready:
                    rselect = []
                    no_more_output.release()
                    no_more_error.release()
                    output_thread.join()
                    error_thread.join()
            if stdout_buf.getvalue().endswith("\n"):
                rselect = []
                no_more_output.release()
                no_more_error.release()
                output_thread.join()
        except OSError as e:
            break

So the best option for me turned out to be threads. This article would be nice read if you want to know more.

Upvotes: 0

Bharel
Bharel

Reputation: 26981

After a long time, I managed to create a function that does it for you in ctypes. Keep in mind this will only work if the process didn't acquire "Exclusive" access. If it did, you're out of luck and will need to use a shadow copy service like shown here or implemented here.
Anyway, here you go:

import ctypes
from ctypes import wintypes
import os
import msvcrt

GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000

OPEN_EXISTING = 3
OPEN_ALWAYS = 4

ACCESS_MODES = {
    "r": GENERIC_READ,
    "w": GENERIC_WRITE,
    "r+": (GENERIC_READ|GENERIC_WRITE)
}

OPEN_MODES = {
    "r": OPEN_EXISTING,
    "w": OPEN_ALWAYS,
    "r+": OPEN_ALWAYS,
}


def open_file_nonblocking(filename, access):
    # Removes the b for binary access.
    internal_access = access.replace("b", "")
    access_mode = ACCESS_MODES[internal_access]
    open_mode = OPEN_MODES[internal_access]
    handle = wintypes.HANDLE(ctypes.windll.kernel32.CreateFileW(
        wintypes.LPWSTR(filename),
        wintypes.DWORD(access_mode),
        wintypes.DWORD(2|1),  # File share read and write
        ctypes.c_void_p(0),
        wintypes.DWORD(open_mode),
        wintypes.DWORD(0),
        wintypes.HANDLE(0)
    ))

    try:
        fd = msvcrt.open_osfhandle(handle.value, 0)
    except OverflowError as exc:
        # Python 3.X
        raise OSError("Failed to open file.") from None
        # Python 2
        # raise OSError("Failed to open file.")

    return os.fdopen(fd, access)

The function opens the file while sharing the read and write handle allowing multiple access. It then converts the handle to a normal python file object.
Make sure to close the file when you finish.

Upvotes: 7

Related Questions