1 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
3 Requires that platform supports, eg, posix-style os.pipe and os.fork.
5 Subprocess class features:
7 - provides non-blocking stdin and stderr reads
9 - provides subprocess stop and continue, kill-on-deletion
11 - provides detection of subprocess startup failure
13 - Subprocess objects have nice, informative string rep (as every good object
16 - RecordFile class provides record-oriented IO for file-like stream objects.
19 __version__ = "Revision: 1.7 "
21 # Id: subproc.py,v 1.7 1998
22 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
24 # Prior art: Initially based python code examples demonstrating usage of pipes
25 # and subprocesses, primarily one by jose pereira.
27 # Implementation notes:
28 # - I'm not using the fcntl module to implement non-blocking file descriptors,
29 # because i don't know what all in it is portable and what is not. I'm not
30 # about to provide for different platform contingencies - at that extent, the
31 # effort would be better spent hacking 'expect' into python.
32 # - Todo? - Incorporate an error-output handler approach, where error output is
33 # checked on regular IO, when a handler is defined, and passed to the
34 # handler (eg for printing) immediately as it shows...
35 # - Detection of failed subprocess startup is a gross kludge, at present.
37 # - new additions (1.3, 1.4):
38 # - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
39 # reads based solely on os.read with select, while capitalizing big-time on
40 # multi-char read chunking.
41 # - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
43 # ken.manheimer@nist.gov
46 import sys, os, string, time, types
51 SubprocessError = 'SubprocessError'
52 # You may need to increase execvp_grace_seconds, if you have a large or slow
54 execvp_grace_seconds = 0.5
57 """Run and communicate asynchronously with a subprocess.
59 Provides non-blocking reads in the form of .readPendingChars and
62 .readline will block until it gets a complete line.
64 .peekPendingChar does a non-blocking, non-consuming read for pending
65 output, and can be used before .readline to check non-destructively for
66 pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until
67 a new character is pending, or timeout secs pass, with granularity of
70 There are corresponding read and peekPendingErrXXX routines, to read from
71 the subprocess stderr stream."""
75 expire_noisily = 1 # Announce subproc destruction?
77 readbuf = 0 # fork will assign to be a readbuf obj
78 errbuf = 0 # fork will assign to be a readbuf obj
80 def __init__(self, cmd, control_stderr=0, expire_noisily=0,
81 in_fd=0, out_fd=1, err_fd=2):
82 """Launch a subprocess, given command string COMMAND."""
85 self.expire_noisily = expire_noisily
86 self.control_stderr = control_stderr
87 self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
90 def fork(self, cmd=None):
91 """Fork a subprocess with designated COMMAND (default, self.cmd)."""
92 if cmd: self.cmd = cmd
94 pRc, cWp = os.pipe() # parent-read-child, child-write-parent
95 cRp, pWc = os.pipe() # child-read-parent, parent-write-child
96 pRe, cWe = os.pipe() # parent-read-error, child-write-error
97 self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
101 if self.pid == 0: #### CHILD ####
102 parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
103 # Reopen stdin, out, err, on pipe ends:
104 os.dup2(cRp, self.in_fd) # cRp = sys.stdin
105 os.dup2(cWp, self.out_fd) # cWp = sys.stdout
106 if self.control_stderr:
107 os.dup2(cWe, self.err_fd) # cWe = sys.stderr
108 # Ensure (within reason) stray file descriptors are closed:
109 excludes = [self.in_fd, self.out_fd, self.err_fd]
110 for i in range(4,100):
111 if i not in excludes:
113 except os.error: pass
116 os._exit(1) # Shouldn't get here.
119 # Connect to the child's file descriptors, using our customized
121 self.toChild = os.fdopen(pWc, 'w')
122 self.toChild_fdlist = [pWc]
123 self.readbuf = ReadBuf(pRc)
124 self.errbuf = ReadBuf(pRe)
125 time.sleep(execvp_grace_seconds)
127 pid, err = os.waitpid(self.pid, os.WNOHANG)
128 except os.error, (errno, msg):
130 raise SubprocessError, \
131 "Subprocess '%s' failed." % self.cmd
132 raise SubprocessError, \
133 "Subprocess failed[%d]: %s" % (errno, msg)
135 # child exited already
138 rc = (err & 0xff00) >> 8
140 raise SubprocessError, (
141 "child killed by signal %d with a return code of %d"
144 raise SubprocessError, \
145 "child exited with return code %d" % rc
146 # Child may have exited, but not in error, so we won't say
147 # anything more at this point.
149 def run_cmd(self, cmd):
150 cmd = string.split(self.cmd)
153 os.execvp(cmd[0], cmd)
154 os._exit(1) # Shouldn't get here
157 if self.control_stderr:
158 os.dup2(parentErr, 2) # Reconnect to parent's stdout
159 sys.stderr.write("**execvp failed, '%s'**\n" %
164 ### Write input to subprocess ###
166 def write(self, str):
167 """Write a STRING to the subprocess."""
170 raise SubprocessError, "no child" # ===>
171 if select.select([],self.toChild_fdlist,[],0)[1]:
172 self.toChild.write(str)
175 # XXX Can write-buffer full be handled better??
176 raise IOError, "write to %s blocked" % self # ===>
178 def writeline(self, line=''):
179 """Write STRING, with added newline termination, to subprocess."""
180 self.write(line + '\n')
182 ### Get output from subprocess ###
184 def peekPendingChar(self):
185 """Return, but (effectively) do not consume a single pending output
186 char, or return null string if none pending."""
188 return self.readbuf.peekPendingChar() # ===>
189 def peekPendingErrChar(self):
190 """Return, but (effectively) do not consume a single pending output
191 char, or return null string if none pending."""
193 return self.errbuf.peekPendingChar() # ===>
195 def waitForPendingChar(self, timeout, pollPause=.1):
196 """Block max TIMEOUT secs until we peek a pending char, returning the
197 char, or '' if none encountered.
199 Pause POLLPAUSE secs (default .1) between polls."""
203 nextChar = self.readbuf.peekPendingChar()
204 if nextChar or (accume > timeout): return nextChar
205 time.sleep(pollPause)
206 accume = accume + pollPause
208 def read(self, n=None):
209 """Read N chars, or all pending if no N specified."""
211 return self.readPendingChars()
214 got0 = self.readPendingChars(n)
218 def readPendingChars(self, max=None):
219 """Read all currently pending subprocess output as a single string."""
220 return self.readbuf.readPendingChars(max)
221 def readPendingErrChars(self):
222 """Read all currently pending subprocess error output as a single
224 if self.control_stderr:
225 return self.errbuf.readPendingChars()
227 raise SubprocessError, "Haven't grabbed subprocess error stream."
229 def readPendingLine(self):
230 """Read currently pending subprocess output, up to a complete line
231 (newline inclusive)."""
232 return self.readbuf.readPendingLine()
233 def readPendingErrLine(self):
234 """Read currently pending subprocess error output, up to a complete
235 line (newline inclusive)."""
236 if self.control_stderr:
237 return self.errbuf.readPendingLine()
239 raise SubprocessError, "Haven't grabbed subprocess error stream."
242 """Return next complete line of subprocess output, blocking until
244 return self.readbuf.readline()
245 def readlineErr(self):
246 """Return next complete line of subprocess error output, blocking until
248 if self.control_stderr:
249 return self.errbuf.readline()
251 raise SubprocessError, "Haven't grabbed subprocess error stream."
253 ### Subprocess Control ###
256 """True if subprocess is alive and kicking."""
257 return self.status(boolean=1)
258 def status(self, boolean=0):
259 """Return string indicating whether process is alive or dead."""
262 status = 'sans command'
264 status = 'sans process'
265 elif not self.cont():
266 status = "(unresponding) '%s'" % self.cmd
268 status = "'%s'" % self.cmd
275 def stop(self, verbose=1):
276 """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
279 os.kill(self.pid, signal.SIGSTOP)
282 print "Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value)
284 if verbose: print "Stopped '%s'" % self.cmd
287 def cont(self, verbose=0):
288 """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
291 os.kill(self.pid, signal.SIGCONT)
294 print ("Continue failed for '%s' - '%s'" %
295 (self.cmd, sys.exc_value))
297 if verbose: print "Continued '%s'" % self.cmd
301 """Send process PID signal SIG (default 9, 'kill'), returning None once
302 it is successfully reaped.
304 SubprocessError is raised if process is not successfully killed."""
307 raise SubprocessError, "No process" # ===>
308 elif not self.cont():
309 raise SubprocessError, "Can't signal subproc %s" % self # ===>
311 # Try sending first a TERM and then a KILL signal.
313 sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
316 os.kill(self.pid, sig[1])
320 # Try a couple or three times to reap the process with waitpid:
322 # WNOHANG == 1 on sunos, presumably same elsewhere.
323 if os.waitpid(self.pid, os.WNOHANG):
324 if self.expire_noisily:
325 print ("\n(%s subproc %d '%s' / %s)" %
326 (sig[0], self.pid, self.cmd,
328 for i in self.pipefiles:
333 # Only got here if subprocess is not gone:
334 raise (SubprocessError,
335 ("Failed kill of subproc %d, '%s', with signals %s" %
336 (self.pid, self.cmd, map(lambda(x): x[0], sigs))))
339 """Terminate the subprocess"""
344 status = self.status()
345 return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
347 # The name of the class is a pun; it is short for "Process", but it also appeals
348 # to the word "Procedure"
349 class Subproc(Subprocess):
350 def run_cmd(self, cmd):
351 apply(cmd[0], cmd[1:])
354 #############################################################################
355 ##### Non-blocking read operations #####
356 #############################################################################
359 """Output buffer for non-blocking reads on selectable files like pipes and
360 sockets. Init with a file descriptor for the file."""
362 def __init__(self, fd, maxChunkSize=1024):
363 """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
369 self.eof = 0 # May be set with stuff still in .buf
371 self.chunkSize = maxChunkSize # Biggest read chunk, default 1024.
376 def peekPendingChar(self):
377 """Return, but don't consume, first character of unconsumed output from
378 file, or empty string if none."""
381 return self.buf[0] # ===>
386 sel = select.select([self.fd], [], [self.fd], 0)
390 self.buf = os.read(self.fd, self.chunkSize) # ===>
391 return self.buf[0] # Assume select don't lie.
392 else: return '' # ===>
395 def readPendingChar(self):
396 """Consume first character of unconsumed output from file, or empty
400 got, self.buf = self.buf[0], self.buf[1:]
406 sel = select.select([self.fd], [], [self.fd], 0)
410 return os.read(self.fd, 1) # ===>
411 else: return '' # ===>
413 def readPendingChars(self, max=None):
414 """Consume uncomsumed output from FILE, or empty string if nothing
419 got, self.buf = self.buf, ''
425 sel = select.select([self.fd], [], [self.fd], 0)
429 got = got + os.read(self.fd, self.chunkSize)
436 self.buf = self.buf + got[max:]
442 def readPendingLine(self, block=0):
443 """Return pending output from FILE, up to first newline (inclusive).
445 Does not block unless optional arg BLOCK is true.
447 Note that an error will be raised if a new eof is encountered without
451 to = string.find(self.buf, '\n')
453 got, self.buf = self.buf[:to+1], self.buf[to+1:]
455 got, self.buf = self.buf, ''
461 # Herein, 'got' contains the (former) contents of the buffer, and it
462 # doesn't include a newline.
464 period = block and 1.0 or 0 # don't be too busy while waiting
465 while 1: # (we'll only loop if block set)
466 sel = select.select(fdlist, [], fdlist, period)
470 got = got + os.read(self.fd, self.chunkSize)
472 to = string.find(got, '\n')
474 got, self.buf = got[:to+1], got[to+1:]
479 self.buf = '' # this is how an ordinary file acts...
481 # otherwise - no newline, blocking requested, no eof - loop. # ==^
484 """Return next output line from file, blocking until it is received."""
486 return self.readPendingLine(1) # ===>
489 #############################################################################
490 ##### Encapsulated reading and writing #####
491 #############################################################################
492 # Encapsulate messages so the end can be unambiguously identified, even
493 # when they contain multiple, possibly empty lines.
496 """Encapsulate stream object for record-oriented IO.
498 Particularly useful when dealing with non-line oriented communications
499 over pipes, eg with subprocesses."""
501 # Message is written preceded by a line containing the message length.
503 def __init__(self, f):
506 def write_record(self, s):
507 "Write so self.read knows exactly how much to read."
508 f = self.__dict__['file']
509 f.write("%s\n%s" % (len(s), s))
510 if hasattr(f, 'flush'):
513 def read_record(self):
514 "Read and reconstruct message as prepared by self.write."
515 f = self.__dict__['file']
516 line = f.readline()[:-1]
519 l = string.atoi(line)
521 raise IOError, ("corrupt %s file structure"
522 % self.__class__.__name__)
528 def __getattr__(self, attr):
529 """Implement characteristic file object attributes."""
530 f = self.__dict__['file']
532 return getattr(f, attr)
534 raise AttributeError, attr
537 return "<%s of %s at %s>" % (self.__class__.__name__,
538 self.__dict__['file'],
542 """Exercise encapsulated write/read with an arbitrary string.
544 Raise IOError if the string gets distorted through transmission!"""
545 from StringIO import StringIO
551 show = " start:\t %s\n end:\t %s\n" % (`s`, `r`)
553 raise IOError, "String distorted:\n%s" % show
555 #############################################################################
556 ##### An example subprocess interfaces #####
557 #############################################################################
560 """Convenient interface to CCSO 'ph' nameserver subprocess.
562 .query('string...') method takes a query and returns a list of dicts, each
563 of which represents one entry."""
565 # Note that i made this a class that handles a subprocess object, rather
566 # than one that inherits from it. I didn't see any functional
567 # disadvantages, and didn't think that full support of the entire
568 # Subprocess functionality was in any way suitable for interaction with
569 # this specialized interface. ? klm 13-Jan-1995
573 self.proc = Subprocess('ph', 1)
575 raise SubprocessError, ('failure starting ph: %s' % # ===>
579 """Send a query and return a list of dicts for responses.
581 Raise a ValueError if ph responds with an error."""
585 self.proc.writeline('query ' + q)
588 response = self.getreply() # Should get null on new prompt.
589 errs = self.proc.readPendingErrChars()
591 sys.stderr.write(errs)
597 elif type(response) == types.StringType:
598 raise ValueError, "ph failed match: '%s'" % response # ===>
599 for line in response:
601 line = string.splitfields(line, ':')
602 it[string.strip(line[0])] = (
603 string.strip(string.join(line[1:])))
606 """Consume next response from ph, returning list of lines or string
608 # Key on first char: (First line may lack newline.)
609 # - dash discard line
610 # - 'ph> ' conclusion of response
611 # - number error message
612 # - whitespace beginning of next response
614 nextChar = self.proc.waitForPendingChar(60)
616 raise SubprocessError, 'ph subprocess not responding' # ===>
617 elif nextChar == '-':
618 # dashed line - discard it, and continue reading:
620 return self.getreply() # ===>
621 elif nextChar == 'p':
622 # 'ph> ' prompt - don't think we should hit this, but what the hay:
624 elif nextChar in '0123456789':
625 # Error notice - we're currently assuming single line errors:
626 return self.proc.readline()[:-1] # ===>
627 elif nextChar in ' \t':
628 # Get content, up to next dashed line:
630 while nextChar != '-' and nextChar != '':
631 got.append(self.proc.readline()[:-1])
632 nextChar = self.proc.peekPendingChar()
635 return "<Ph instance, %s at %s>\n" % (self.proc.status(),
638 """Clear-out initial preface or residual subproc input and output."""
639 pause = .5; maxIter = 10 # 5 seconds to clear
643 while iterations < maxIter:
644 got = got + self.proc.readPendingChars()
645 # Strip out all but the last incomplete line:
646 got = string.splitfields(got, '\n')[-1]
647 if got == 'ph> ': return # Ok. ===>
649 raise SubprocessError, ('ph not responding within %s secs' %
652 #############################################################################
654 #############################################################################
657 print "\tOpening subprocess:"
658 p = Subprocess('cat', 1) # set to expire noisily...
660 print "\tOpening bogus subprocess, should fail:"
662 b = Subprocess('/', 1)
663 print "\tOops! Null-named subprocess startup *succeeded*?!?"
664 except SubprocessError:
665 print "\t...yep, it failed."
666 print '\tWrite, then read, two newline-teriminated lines, using readline:'
667 p.write('first full line written\n'); p.write('second.\n')
670 print '\tThree lines, last sans newline, read using combination:'
671 p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
672 print '\tFirst line via readline:'
674 print '\tRest via readPendingChars:'
675 print p.readPendingChars()
676 print "\tStopping then continuing subprocess (verbose):"
677 if not p.stop(1): # verbose stop
678 print '\t** Stop seems to have failed!'
680 print '\tWriting line while subprocess is paused...'
681 p.write('written while subprocess paused\n')
682 print '\tNonblocking read of paused subprocess (should be empty):'
683 print p.readPendingChars()
684 print '\tContinuing subprocess (verbose):'
685 if not p.cont(1): # verbose continue
686 print '\t** Continue seems to have failed! Probly lost subproc...'
689 print '\tReading accumulated line, blocking read:'
691 print "\tDeleting subproc, which was set to die noisily:"