controlling less with Popen
It's a bit involved, but it's possible to use forkpty(3)
to create a new TTY in which you have full control over less
, forwarding input and output to the original TTY so that it feels seamless.
The code below uses Python 3 and its standard library. pexpect can do a lot of the heavy lifting but that doesn't ship with Python. Plus it's more educational this way.
import contextlibimport fcntlimport ioimport osimport ptyimport selectimport signalimport structimport termiosimport timeimport tty
Assume the rest of the code is indented to run within this context manager.
with contextlib.ExitStack() as stack:
We need to grab the real TTY and set it to raw mode. This can confuse other users of the TTY (for example, the shell after this program exits), so make sure to put it back to the same state after.
tty_fd = os.open('/dev/tty', os.O_RDWR | os.O_CLOEXEC)stack.callback(os.close, tty_fd)tc = termios.tcgetattr(tty_fd)stack.callback(termios.tcsetattr, tty_fd, termios.TCSANOW, tc)tty.setraw(tty_fd, when=termios.TCSANOW)
Then we can invoke forkpty
, which is named pty.fork()
in Python. This does a couple things:
- Creates a pseudoterminal.
- Forks a new child.
- Attach the child to the slave end of the PTY.
- Return the child's PID and the master end of the PTY to the original process.
The child should run less
. Note the use of _exit(2)
as it can be unsafe to continue executing other code after a fork
.
child_pid, master_fd = pty.fork()if child_pid == 0: os.execv('/bin/sh', ('/bin/sh', '-c', 'echo hello | less -K -R')) os._exit(0)stack.callback(os.close, master_fd)
Then there's a bit of work involved to set up a few asynchronous signal handlers.
SIGCHLD
is received when a child process changes state (such as exiting). We can use this to keep track of whether the child is still running.SIGWINCH
is received when the controlling terminal changes size. We forward this size to the PTY (which will automatically send another window change signal to the processes attached to it). We should set the PTY's window size to match to start, too.
It may also make sense to forward signals such as SIGINT
, SIGTERM
, etc.
child_is_running = Truedef handle_chld(signum, frame): while True: pid, status = os.waitpid(-1, os.P_NOWAIT) if not pid: break if pid == child_pid: child_is_running = Falsedef handle_winch(signum, frame): tc = struct.pack('HHHH', 0, 0, 0, 0) tc = fcntl.ioctl(tty_fd, termios.TIOCGWINSZ, tc) fcntl.ioctl(master_fd, termios.TIOCSWINSZ, tc)handler = signal.signal(signal.SIGCHLD, handle_chld)stack.callback(signal.signal, signal.SIGCHLD, handler)handler = signal.signal(signal.SIGWINCH, handle_winch)stack.callback(signal.signal, signal.SIGWINCH, handler)handle_winch(0, None)
Now for the real meat: copying data between the real and fake TTY.
target_time = time.clock_gettime(time.CLOCK_MONOTONIC_RAW) + 1has_sent_q = Falsewith contextlib.suppress(OSError): while child_is_running: now = time.clock_gettime(time.CLOCK_MONOTONIC_RAW) if now < target_time: timeout = target_time - now else: timeout = None if not has_sent_q: os.write(master_fd, b'q') has_sent_q = True rfds, wfds, xfds = select.select((tty_fd, master_fd), (), (), timeout) if tty_fd in rfds: data = os.read(tty_fd, io.DEFAULT_BUFFER_SIZE) os.write(master_fd, data) if master_fd in rfds: data = os.read(master_fd, io.DEFAULT_BUFFER_SIZE) os.write(tty_fd, data)
It looks straightforward, although I'm glossing over a few things, such as proper short write and SIGTTIN
/SIGTTOU
handling (partly hidden by suppressing OSError
).