Non blocking read on os.pipe on Windows Non blocking read on os.pipe on Windows windows windows

Non blocking read on os.pipe on Windows


Answering my own question after digging for some time through StackOverflow.

UPDATE: Things changes thanks to @HarryJohnston.

At first the answer was no, it is not possible to do non-blocking read on os.pipe on Windows. From this answer I've got that:

The term for non-blocking / asynchronous I/O in Windows is 'overlapped' - that's what you should be looking at.

os.pipe on Windows is implemented through CreatePipe API (see here and ... well, I couldn't find os.pipe code in Python sources). CreatePipe makes anonymous pipes, and anonymous pipes do not support asynchronous I/O.

But then @HarryJohnston commented that SetNamedPipeHandleState doc allows to put anonymous pipe to non-blocking mode. I wrote the test and it failed with OSError: [Errno 22] Invalid argument. The error message seemed wrong, so I tried to check what should be return result on non-blocking read operation when data is not available, and after reading MSDN note on named pipe modes I found that it should be ERROR_NO_DATA that has a int value 232. Adding ctypes.WinError() call to exception handler revealed the expected [Error 232] The pipe is being closed.

So, the answer is yes, it is possible to do non-blocking read on os.pipe on Windows, and here is the proof:

import msvcrtimport osfrom ctypes import windll, byref, wintypes, GetLastError, WinErrorfrom ctypes.wintypes import HANDLE, DWORD, POINTER, BOOLLPDWORD = POINTER(DWORD)PIPE_NOWAIT = wintypes.DWORD(0x00000001)ERROR_NO_DATA = 232def pipe_no_wait(pipefd):  """ pipefd is a integer as returned by os.pipe """  SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState  SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]  SetNamedPipeHandleState.restype = BOOL  h = msvcrt.get_osfhandle(pipefd)  res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)  if res == 0:      print(WinError())      return False  return Trueif __name__  == '__main__':  # CreatePipe  r, w = os.pipe()  pipe_no_wait(r)  print os.write(w, 'xxx')  print os.read(r, 1024)  try:    print os.write(w, 'yyy')    print os.read(r, 1024)    print os.read(r, 1024)  except OSError as e:    print dir(e), e.errno, GetLastError()    print(WinError())    if GetLastError() != ERROR_NO_DATA:        raise


This answer is basically @anatolytechtonik's answer but with classes.

import msvcrtimport os# No idea what is going on here but if it works, it works.from ctypes import windll, byref, wintypes, GetLastError, WinError, POINTERfrom ctypes.wintypes import HANDLE, DWORD, BOOL# ???LPDWORD = POINTER(DWORD)PIPE_NOWAIT = wintypes.DWORD(0x00000001)ERROR_NO_DATA = 232class AdvancedFD:    """    A wrapper for a file descriptor so that we can call:        `<AdvancedFD>.read(number_of_bytes)` and        `<AdvancedFD>.write(data_as_bytes)`    It also makes the `read_fd` non blocking. When reading from a non-blocking    pipe with no data it returns b"".    Methods:        write(data: bytes) -> None        read(number_of_bytes: int) -> bytes        rawfd() -> int        close() -> None    """    def __init__(self, fd: int):        self.fd = fd        self.closed = False    def __del__(self) -> None:        """        When this object is garbage collected close the fd        """        self.close()    def close(self) -> None:        """        Closes the file descriptor.        Note: it cannot be reopened and might raise an error if it is        being used. You don't have to call this function. It is automatically        called when this object is being garbage collected.        """        self.closed = True    def write(self, data: bytes) -> None:        """        Writes a string of bytes to the file descriptor.        Note: Must be bytes.        """        os.write(self.fd, data)    def read(self, x: int) -> bytes:        """        Reads `x` bytes from the file descriptor.        Note: `x` must be an int              Returns the bytes. Use `<bytes>.decode()` to convert it to a str        """        try:            return os.read(self.fd, x)        except OSError as error:            err_code = GetLastError()            # If the error code is `ERROR_NO_DATA`            if err_code == ERROR_NO_DATA:                # Return an empty string of bytes                return b""            else:                # Otherwise raise the error                website = "https://docs.microsoft.com/en-us/windows/win32/" +\                          "debug/system-error-codes--0-499-"                raise OSError("An exception occured. Error code: %i Look up" +\                              " the error code here: %s" % (err_code, website))    def config_non_blocking(self) -> bool:        """        Makes the file descriptor non blocking.        Returns `True` if sucessfull, otherwise returns `False`        """        # Please note that this is kindly plagiarised from:        # https://stackoverflow.com/a/34504971/11106801        SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState        SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]        SetNamedPipeHandleState.restype = BOOL        handle = msvcrt.get_osfhandle(self.fd)        res = windll.kernel32.SetNamedPipeHandleState(handle,                                                      byref(PIPE_NOWAIT), None,                                                      None)        return not (res == 0)    def rawfd(self) -> int:        """        Returns the raw fd as an int.        """        return self.fdclass NonBlockingPipe:    """    Creates 2 file descriptors and wrapps them in the `AdvancedFD` class    so that we can call:        `<AdvancedFD>.read(number_of_bytes)` and        `<AdvancedFD>.write(data_as_bytes)`    It also makes the `read_fd` non blocking. When reading from a non-blocking    pipe with no data it returns b"".    Methods:        write(data: bytes) -> None        read(number_of_bytes: int) -> bytes        rawfds() -> (int, int)        close() -> None    """    def __init__(self):        self.read_fd, self.write_fd = self.create_pipes()        self.read_fd.config_non_blocking()    def __del__(self) -> None:        """        When this object is garbage collected close the fds        """        self.close()    def close(self) -> None:        """        Note: it cannot be reopened and might raise an error if it is        being used. You don't have to call this function. It is automatically        called when this object is being garbage collected.        """        self.read_fd.close()        self.write_fd.close()    def create_pipes(self) -> (AdvancedFD, AdvancedFD):        """        Creates 2 file descriptors and wrapps them in the `Pipe` class so        that we can call:            `<Pipe>.read(number_of_bytes)` and            `<Pipe>.write(data_as_bytes)`        """        read_fd, write_fd = os.pipe()        return AdvancedFD(read_fd), AdvancedFD(write_fd)    def write(self, data: bytes) -> None:        """        Writes a string of bytes to the file descriptor.        Note: Must be bytes.        """        self.write_fd.write(data)    def read(self, number_of_bytes: int) -> bytes:        """        Reads `x` bytes from the file descriptor.        Note: `x` must be an int              Returns the bytes. Use `<bytes>.decode()` to convert it to a str        """        return self.read_fd.read(number_of_bytes)    def rawfds(self) -> (int, int):        """        Returns the raw file descriptors as ints in the form:            (read_fd, write_fd)        """        return self.read_fd.rawfd(), self.write_fd.rawfd()if __name__  == "__main__":    # Create the nonblocking pipe    pipe = NonBlockingPipe()    pipe.write(b"xxx")    print(pipe.read(1024)) # Check if it can still read properly    pipe.write(b"yyy")    print(pipe.read(1024)) # Read all of the data in the pipe    print(pipe.read(1024)) # Check if it is non blocking