blocks - send input to python subprocess pipeline blocks - send input to python subprocess pipeline python python

blocks - send input to python subprocess pipeline


I found out how to do it.

It is not about threads, and not about select().

When I run the first process (grep), it creates two low-level file descriptors, one for each pipe. Lets call those a and b.

When I run the second process, b gets passed to cut sdtin. But there is a brain-dead default on Popen - close_fds=False.

The effect of that is that cut also inherits a. So grep can't die even if I close a, because stdin is still open on cut's process (cut ignores it).

The following code now runs perfectly.

from subprocess import Popen, PIPEp1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)p1.stdin.write('Hello World\n')p1.stdin.close()result = p2.stdout.read() assert result == "Hello Worl\n"

close_fds=True SHOULD BE THE DEFAULT on unix systems. On windows it closes all fds, so it prevents piping.

EDIT:

PS: For people with a similar problem reading this answer: As pooryorick said in a comment, that also could block if data written to p1.stdin is bigger than the buffers. In that case you should chunk the data into smaller pieces, and use select.select() to know when to read/write. The code in the question should give a hint on how to implement that.

EDIT2: Found another solution, with more help from pooryorick - instead of using close_fds=True and close ALL fds, one could close the fds that belongs to the first process, when executing the second, and it will work. The closing must be done in the child so the preexec_fn function from Popen comes very handy to do just that. On executing p2 you can do:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)


Working with large files

Two principles need to be applied uniformly when working with large files in Python.

  1. Since any IO routine can block, we must keep each stage of the pipeline in a different thread or process. We use threads in this example, but subprocesses would let you avoid the GIL.
  2. We must use incremental reads and writes so that we don't wait for EOF before starting to make progress.

An alternative is to use nonblocking IO, though this is cumbersome in standard Python. See gevent for a lightweight threading library that implements the synchronous IO API using nonblocking primitives.

Example code

We'll construct a silly pipeline that is roughly

{cat /usr/share/dict/words} | grep -v not              \    | {upcase, filtered tee to stderr} | cut -c 1-10   \    | {translate 'E' to '3'} | grep K | grep Z | {downcase}

where each stage in braces {} is implemented in Python while the others use standard external programs. TL;DR: See this gist.

We start with the expected imports.

#!/usr/bin/env pythonfrom subprocess import Popen, PIPEimport sys, threading

Python stages of the pipeline

All but the last Python-implemented stage of the pipeline needs to go in a thread so that it's IO does not block the others. These could instead run in Python subprocesses if you wanted them to actually run in parallel (avoid the GIL).

def writer(output):    for line in open('/usr/share/dict/words'):        output.write(line)    output.close()def filter(input, output):    for line in input:        if 'k' in line and 'z' in line: # Selective 'tee'            sys.stderr.write('### ' + line)        output.write(line.upper())    output.close()def leeter(input, output):    for line in input:        output.write(line.replace('E', '3'))    output.close()

Each of these needs to be put in its own thread, which we'll do using this convenience function.

def spawn(func, **kwargs):    t = threading.Thread(target=func, kwargs=kwargs)    t.start()    return t

Create the pipeline

Create the external stages using Popen and the Python stages using spawn. The argument bufsize=-1 says to use the system default buffering (usually 4 kiB). This is generally faster than the default (unbuffered) or line buffering, but you'll want line buffering if you want to visually monitor the output without lags.

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)twriter = spawn(writer, output=grepv.stdin)tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

Drive the pipeline

Assembled as above, all the buffers in the pipeline will fill up, but since nobody is reading from the end (grepz.stdout), they will all block. We could read the entire thing in one call to grepz.stdout.read(), but that would use a lot of memory for large files. Instead, we read incrementally.

for line in grepz.stdout:    sys.stdout.write(line.lower())

The threads and processes clean up once they reach EOF. We can explicitly clean up using

for t in [twriter, tfilter, tleeter]: t.join()for p in [grepv, cut, grepk, grepz]: p.wait()

Python-2.6 and earlier

Internally, subprocess.Popen calls fork, configures the pipe file descriptors, and calls exec. The child process from fork has copies of all file descriptors in the parent process, and both copies will need to be closed before the corresponding reader will get EOF. This can be fixed by manually closing the pipes (either by close_fds=True or a suitable preexec_fn argument to subprocess.Popen) or by setting the FD_CLOEXEC flag to have exec automatically close the file descriptor. This flag is set automatically in Python-2.7 and later, see issue12786. We can get the Python-2.7 behavior in earlier versions of Python by calling

p._set_cloexec_flags(p.stdin)

before passing p.stdin as an argument to a subsequent subprocess.Popen.


There are three main tricks to making pipes work as expected

  1. Make sure each end of the pipe is used in a different thread/process(some of the examples near the top suffer from this problem).

  2. explicitly close the unused end of the pipe in each process

  3. deal with buffering by either disabling it (Python -u option), usingpty's, or simply filling up the buffer with something that won't affect thedata, ( maybe '\n', but whatever fits).

The examples in the Python "pipeline" module (I'm the author) fit your scenarioexactly, and make the low-level steps fairly clear.

http://pypi.python.org/pypi/pipeline/

More recently, I used the subprocess module as part of aproducer-processor-consumer-controller pattern:

http://www.darkarchive.org/w/Pub/PythonInteract

This example deals with buffered stdin without resorting to using a pty, andalso illustrates which pipe ends should be closed where. I prefer processes tothreading, but the principle is the same. Additionally, it illustratessynchronizing Queues to which feed the producer and collect output from the consumer,and how to shut them down cleanly (look out for the sentinels inserted into thequeues). This pattern allows new input to be generated based on recent output,allowing for recursive discovery and processing.