controlling less with Popen controlling less with Popen unix unix

controlling less with Popen


It's a bit involved, but it's possible to use forkpty(3) to create a new TTY in which you have full control over less, forwarding input and output to the original TTY so that it feels seamless.

The code below uses Python 3 and its standard library. pexpect can do a lot of the heavy lifting but that doesn't ship with Python. Plus it's more educational this way.

import contextlibimport fcntlimport ioimport osimport ptyimport selectimport signalimport structimport termiosimport timeimport tty

Assume the rest of the code is indented to run within this context manager.

with contextlib.ExitStack() as stack:

We need to grab the real TTY and set it to raw mode. This can confuse other users of the TTY (for example, the shell after this program exits), so make sure to put it back to the same state after.

tty_fd = os.open('/dev/tty', os.O_RDWR | os.O_CLOEXEC)stack.callback(os.close, tty_fd)tc = termios.tcgetattr(tty_fd)stack.callback(termios.tcsetattr, tty_fd, termios.TCSANOW, tc)tty.setraw(tty_fd, when=termios.TCSANOW)

Then we can invoke forkpty, which is named pty.fork() in Python. This does a couple things:

  • Creates a pseudoterminal.
  • Forks a new child.
  • Attach the child to the slave end of the PTY.
  • Return the child's PID and the master end of the PTY to the original process.

The child should run less. Note the use of _exit(2) as it can be unsafe to continue executing other code after a fork.

child_pid, master_fd = pty.fork()if child_pid == 0:    os.execv('/bin/sh', ('/bin/sh', '-c', 'echo hello | less -K -R'))    os._exit(0)stack.callback(os.close, master_fd)

Then there's a bit of work involved to set up a few asynchronous signal handlers.

  • SIGCHLD is received when a child process changes state (such as exiting). We can use this to keep track of whether the child is still running.
  • SIGWINCH is received when the controlling terminal changes size. We forward this size to the PTY (which will automatically send another window change signal to the processes attached to it). We should set the PTY's window size to match to start, too.

It may also make sense to forward signals such as SIGINT, SIGTERM, etc.

child_is_running = Truedef handle_chld(signum, frame):    while True:        pid, status = os.waitpid(-1, os.P_NOWAIT)        if not pid:            break        if pid == child_pid:            child_is_running = Falsedef handle_winch(signum, frame):    tc = struct.pack('HHHH', 0, 0, 0, 0)    tc = fcntl.ioctl(tty_fd, termios.TIOCGWINSZ, tc)    fcntl.ioctl(master_fd, termios.TIOCSWINSZ, tc)handler = signal.signal(signal.SIGCHLD, handle_chld)stack.callback(signal.signal, signal.SIGCHLD, handler)handler = signal.signal(signal.SIGWINCH, handle_winch)stack.callback(signal.signal, signal.SIGWINCH, handler)handle_winch(0, None)

Now for the real meat: copying data between the real and fake TTY.

target_time = time.clock_gettime(time.CLOCK_MONOTONIC_RAW) + 1has_sent_q = Falsewith contextlib.suppress(OSError):    while child_is_running:        now = time.clock_gettime(time.CLOCK_MONOTONIC_RAW)        if now < target_time:            timeout = target_time - now        else:            timeout = None            if not has_sent_q:                os.write(master_fd, b'q')                has_sent_q = True        rfds, wfds, xfds = select.select((tty_fd, master_fd), (), (), timeout)        if tty_fd in rfds:            data = os.read(tty_fd, io.DEFAULT_BUFFER_SIZE)            os.write(master_fd, data)        if master_fd in rfds:            data = os.read(master_fd, io.DEFAULT_BUFFER_SIZE)            os.write(tty_fd, data)

It looks straightforward, although I'm glossing over a few things, such as proper short write and SIGTTIN/SIGTTOU handling (partly hidden by suppressing OSError).