1 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
3 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
6 Subprocess class features:
8 - provides non-blocking stdin and stderr reads
10 - provides subprocess stop and continue, kill-on-deletion
12 - provides detection of subprocess startup failure
14 - Subprocess objects have nice, informative string rep (as every good object
17 __version__ = "Revision: 1.15 "
19 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
20 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
22 # Prior art: Initially based python code examples demonstrating usage of pipes
23 # and subprocesses, primarily one by jose pereira.
25 # Implementation notes:
26 # - I'm not using the fcntl module to implement non-blocking file descriptors,
27 # because i don't know what all in it is portable and what is not. I'm not
28 # about to provide for different platform contingencies - at that extent, the
29 # effort would be better spent hacking 'expect' into python.
30 # - Todo? - Incorporate an error-output handler approach, where error output is
31 # checked on regular IO, when a handler is defined, and passed to the
32 # handler (eg for printing) immediately as it shows...
33 # - Detection of failed subprocess startup is a gross kludge, at present.
35 # - new additions (1.3, 1.4):
36 # - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
37 # reads based solely on os.read with select, while capitalizing big-time on
38 # multi-char read chunking.
39 # - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
41 # ken.manheimer@nist.gov
44 import sys, os, string, time, types
49 SubprocessError = 'SubprocessError'
50 # You may need to increase execvp_grace_seconds, if you have a large or slow
52 execvp_grace_seconds = 0.5
55 """Run and communicate asynchronously with a subprocess.
57 Provides non-blocking reads in the form of .readPendingChars and
60 .readline will block until it gets a complete line.
62 .peekPendingChar does a non-blocking, non-consuming read for pending
63 output, and can be used before .readline to check non-destructively for
64 pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until
65 a new character is pending, or timeout secs pass, with granularity of
68 There are corresponding read and peekPendingErrXXX routines, to read from
69 the subprocess stderr stream."""
73 expire_noisily = 1 # Announce subproc destruction?
75 readbuf = 0 # fork will assign to be a readbuf obj
76 errbuf = 0 # fork will assign to be a readbuf obj
78 def __init__(self, cmd, control_stderr=0, expire_noisily=0,
79 in_fd=0, out_fd=1, err_fd=2):
80 """Launch a subprocess, given command string COMMAND."""
83 self.expire_noisily = expire_noisily
84 self.control_stderr = control_stderr
85 self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
88 def fork(self, cmd=None):
89 """Fork a subprocess with designated COMMAND (default, self.cmd)."""
90 if cmd: self.cmd = cmd
92 cmd = string.split(self.cmd)
93 pRc, cWp = os.pipe() # parent-read-child, child-write-parent
94 cRp, pWc = os.pipe() # child-read-parent, parent-write-child
95 pRe, cWe = os.pipe() # parent-read-error, child-write-error
96 self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
100 if self.pid == 0: #### CHILD ####
101 parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
102 # Reopen stdin, out, err, on pipe ends:
103 os.dup2(cRp, self.in_fd) # cRp = sys.stdin
104 os.dup2(cWp, self.out_fd) # cWp = sys.stdout
105 if self.control_stderr:
106 os.dup2(cWe, self.err_fd) # cWe = sys.stderr
107 # Ensure (within reason) stray file descriptors are closed:
108 excludes = [self.in_fd, self.out_fd, self.err_fd]
109 for i in range(4,100):
110 if i not in excludes:
112 except os.error: pass
115 os.execvp(cmd[0], cmd)
116 os._exit(1) # Shouldn't get here
119 if self.control_stderr:
120 os.dup2(parentErr, 2) # Reconnect to parent's stdout
121 sys.stderr.write("**execvp failed, '%s'**\n" %
124 os._exit(1) # Shouldn't get here.
127 # Connect to the child's file descriptors, using our customized
129 self.toChild = os.fdopen(pWc, 'w')
130 self.toChild_fdlist = [pWc]
131 self.readbuf = ReadBuf(pRc)
132 self.errbuf = ReadBuf(pRe)
133 time.sleep(execvp_grace_seconds)
135 pid, err = os.waitpid(self.pid, os.WNOHANG)
136 except os.error, (errno, msg):
138 raise SubprocessError, \
139 "Subprocess '%s' failed." % self.cmd
140 raise SubprocessError, \
141 "Subprocess failed[%d]: %s" % (errno, msg)
143 # child exited already
146 rc = (err & 0xff00) >> 8
148 raise SubprocessError, (
149 "child killed by signal %d with a return code of %d"
152 raise SubprocessError, \
153 "child exited with return code %d" % rc
154 # Child may have exited, but not in error, so we won't say
155 # anything more at this point.
157 ### Write input to subprocess ###
159 def write(self, str):
160 """Write a STRING to the subprocess."""
163 raise SubprocessError, "no child" # ===>
164 if select.select([],self.toChild_fdlist,[],0)[1]:
165 self.toChild.write(str)
168 # XXX Can write-buffer full be handled better??
169 raise IOError, "write to %s blocked" % self # ===>
171 def writeline(self, line=''):
172 """Write STRING, with added newline termination, to subprocess."""
173 self.write(line + '\n')
175 ### Get output from subprocess ###
177 def peekPendingChar(self):
178 """Return, but (effectively) do not consume a single pending output
179 char, or return null string if none pending."""
181 return self.readbuf.peekPendingChar() # ===>
182 def peekPendingErrChar(self):
183 """Return, but (effectively) do not consume a single pending output
184 char, or return null string if none pending."""
186 return self.errbuf.peekPendingChar() # ===>
188 def waitForPendingChar(self, timeout, pollPause=.1):
189 """Block max TIMEOUT secs until we peek a pending char, returning the
190 char, or '' if none encountered.
192 Pause POLLPAUSE secs (default .1) between polls."""
196 nextChar = self.readbuf.peekPendingChar()
197 if nextChar or (accume > timeout): return nextChar
198 time.sleep(pollPause)
199 accume = accume + pollPause
201 def read(self, n=None):
202 """Read N chars, or all pending if no N specified."""
204 return self.readPendingChars()
207 got0 = self.readPendingChars(n)
211 def readPendingChars(self, max=None):
212 """Read all currently pending subprocess output as a single string."""
213 return self.readbuf.readPendingChars(max)
214 def readPendingErrChars(self):
215 """Read all currently pending subprocess error output as a single
217 if self.control_stderr:
218 return self.errbuf.readPendingChars()
220 raise SubprocessError, "Haven't grabbed subprocess error stream."
222 def readPendingLine(self):
223 """Read currently pending subprocess output, up to a complete line
224 (newline inclusive)."""
225 return self.readbuf.readPendingLine()
226 def readPendingErrLine(self):
227 """Read currently pending subprocess error output, up to a complete
228 line (newline inclusive)."""
229 if self.control_stderr:
230 return self.errbuf.readPendingLine()
232 raise SubprocessError, "Haven't grabbed subprocess error stream."
235 """Return next complete line of subprocess output, blocking until
237 return self.readbuf.readline()
238 def readlineErr(self):
239 """Return next complete line of subprocess error output, blocking until
241 if self.control_stderr:
242 return self.errbuf.readline()
244 raise SubprocessError, "Haven't grabbed subprocess error stream."
246 ### Subprocess Control ###
249 """True if subprocess is alive and kicking."""
250 return self.status(boolean=1)
251 def status(self, boolean=0):
252 """Return string indicating whether process is alive or dead."""
255 status = 'sans command'
257 status = 'sans process'
258 elif not self.cont():
259 status = "(unresponding) '%s'" % self.cmd
261 status = "'%s'" % self.cmd
268 def stop(self, verbose=1):
269 """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
272 os.kill(self.pid, signal.SIGSTOP)
275 print "Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value)
277 if verbose: print "Stopped '%s'" % self.cmd
280 def cont(self, verbose=0):
281 """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
284 os.kill(self.pid, signal.SIGCONT)
287 print ("Continue failed for '%s' - '%s'" %
288 (self.cmd, sys.exc_value))
290 if verbose: print "Continued '%s'" % self.cmd
294 """Send process PID signal SIG (default 9, 'kill'), returning None once
295 it is successfully reaped.
297 SubprocessError is raised if process is not successfully killed."""
300 raise SubprocessError, "No process" # ===>
301 elif not self.cont():
302 raise SubprocessError, "Can't signal subproc %s" % self # ===>
304 # Try sending first a TERM and then a KILL signal.
306 sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
309 os.kill(self.pid, sig[1])
313 # Try a couple or three times to reap the process with waitpid:
315 # WNOHANG == 1 on sunos, presumably same elsewhere.
316 if os.waitpid(self.pid, os.WNOHANG):
317 if self.expire_noisily:
318 print ("\n(%s subproc %d '%s' / %s)" %
319 (sig[0], self.pid, self.cmd,
321 for i in self.pipefiles:
326 # Only got here if subprocess is not gone:
327 raise (SubprocessError,
328 ("Failed kill of subproc %d, '%s', with signals %s" %
329 (self.pid, self.cmd, map(lambda(x): x[0], sigs))))
332 """Terminate the subprocess"""
337 status = self.status()
338 return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
340 #############################################################################
341 ##### Non-blocking read operations #####
342 #############################################################################
345 """Output buffer for non-blocking reads on selectable files like pipes and
346 sockets. Init with a file descriptor for the file."""
348 def __init__(self, fd, maxChunkSize=1024):
349 """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
355 self.eof = 0 # May be set with stuff still in .buf
357 self.chunkSize = maxChunkSize # Biggest read chunk, default 1024.
362 def peekPendingChar(self):
363 """Return, but don't consume, first character of unconsumed output from
364 file, or empty string if none."""
367 return self.buf[0] # ===>
372 sel = select.select([self.fd], [], [self.fd], 0)
376 self.buf = os.read(self.fd, self.chunkSize) # ===>
377 return self.buf[0] # Assume select don't lie.
378 else: return '' # ===>
381 def readPendingChar(self):
382 """Consume first character of unconsumed output from file, or empty
386 got, self.buf = self.buf[0], self.buf[1:]
392 sel = select.select([self.fd], [], [self.fd], 0)
396 return os.read(self.fd, 1) # ===>
397 else: return '' # ===>
399 def readPendingChars(self, max=None):
400 """Consume uncomsumed output from FILE, or empty string if nothing
405 if (max > 0) and (len(self.buf) > max):
406 got = self.buf[0:max]
407 self.buf = self.buf[max:]
409 got, self.buf = self.buf, ''
415 sel = select.select([self.fd], [], [self.fd], 0)
419 got = got + os.read(self.fd, self.chunkSize)
426 self.buf = self.buf + got[max:]
432 def readPendingLine(self, block=0):
433 """Return pending output from FILE, up to first newline (inclusive).
435 Does not block unless optional arg BLOCK is true.
437 Note that an error will be raised if a new eof is encountered without
441 to = string.find(self.buf, '\n')
443 got, self.buf = self.buf[:to+1], self.buf[to+1:]
445 got, self.buf = self.buf, ''
451 # Herein, 'got' contains the (former) contents of the buffer, and it
452 # doesn't include a newline.
454 period = block and 1.0 or 0 # don't be too busy while waiting
455 while 1: # (we'll only loop if block set)
456 sel = select.select(fdlist, [], fdlist, period)
460 got = got + os.read(self.fd, self.chunkSize)
462 to = string.find(got, '\n')
464 got, self.buf = got[:to+1], got[to+1:]
469 self.buf = '' # this is how an ordinary file acts...
471 # otherwise - no newline, blocking requested, no eof - loop. # ==^
474 """Return next output line from file, blocking until it is received."""
476 return self.readPendingLine(1) # ===>
479 #############################################################################
480 ##### Encapsulated reading and writing #####
481 #############################################################################
482 # Encapsulate messages so the end can be unambiguously identified, even
483 # when they contain multiple, possibly empty lines.
486 """Encapsulate stream object for record-oriented IO.
488 Particularly useful when dealing with non-line oriented communications
489 over pipes, eg with subprocesses."""
491 # Message is written preceded by a line containing the message length.
493 def __init__(self, f):
496 def write_record(self, s):
497 "Write so self.read knows exactly how much to read."
498 f = self.__dict__['file']
499 f.write("%s\n%s" % (len(s), s))
500 if hasattr(f, 'flush'):
503 def read_record(self):
504 "Read and reconstruct message as prepared by self.write."
505 f = self.__dict__['file']
506 line = f.readline()[:-1]
509 l = string.atoi(line)
511 raise IOError, ("corrupt %s file structure"
512 % self.__class__.__name__)
518 def __getattr__(self, attr):
519 """Implement characteristic file object attributes."""
520 f = self.__dict__['file']
522 return getattr(f, attr)
524 raise AttributeError, attr
527 return "<%s of %s at %s>" % (self.__class__.__name__,
528 self.__dict__['file'],
532 """Exercise encapsulated write/read with an arbitrary string.
534 Raise IOError if the string gets distorted through transmission!"""
535 from StringIO import StringIO
541 show = " start:\t %s\n end:\t %s\n" % (`s`, `r`)
543 raise IOError, "String distorted:\n%s" % show
545 #############################################################################
546 ##### An example subprocess interfaces #####
547 #############################################################################
550 """Convenient interface to CCSO 'ph' nameserver subprocess.
552 .query('string...') method takes a query and returns a list of dicts, each
553 of which represents one entry."""
555 # Note that i made this a class that handles a subprocess object, rather
556 # than one that inherits from it. I didn't see any functional
557 # disadvantages, and didn't think that full support of the entire
558 # Subprocess functionality was in any way suitable for interaction with
559 # this specialized interface. ? klm 13-Jan-1995
563 self.proc = Subprocess('ph', 1)
565 raise SubprocessError, ('failure starting ph: %s' % # ===>
569 """Send a query and return a list of dicts for responses.
571 Raise a ValueError if ph responds with an error."""
575 self.proc.writeline('query ' + q)
578 response = self.getreply() # Should get null on new prompt.
579 errs = self.proc.readPendingErrChars()
581 sys.stderr.write(errs)
587 elif type(response) == types.StringType:
588 raise ValueError, "ph failed match: '%s'" % response # ===>
589 for line in response:
591 line = string.splitfields(line, ':')
592 it[string.strip(line[0])] = (
593 string.strip(string.join(line[1:])))
596 """Consume next response from ph, returning list of lines or string
598 # Key on first char: (First line may lack newline.)
599 # - dash discard line
600 # - 'ph> ' conclusion of response
601 # - number error message
602 # - whitespace beginning of next response
604 nextChar = self.proc.waitForPendingChar(60)
606 raise SubprocessError, 'ph subprocess not responding' # ===>
607 elif nextChar == '-':
608 # dashed line - discard it, and continue reading:
610 return self.getreply() # ===>
611 elif nextChar == 'p':
612 # 'ph> ' prompt - don't think we should hit this, but what the hay:
614 elif nextChar in '0123456789':
615 # Error notice - we're currently assuming single line errors:
616 return self.proc.readline()[:-1] # ===>
617 elif nextChar in ' \t':
618 # Get content, up to next dashed line:
620 while nextChar != '-' and nextChar != '':
621 got.append(self.proc.readline()[:-1])
622 nextChar = self.proc.peekPendingChar()
625 return "<Ph instance, %s at %s>\n" % (self.proc.status(),
628 """Clear-out initial preface or residual subproc input and output."""
629 pause = .5; maxIter = 10 # 5 seconds to clear
633 while iterations < maxIter:
634 got = got + self.proc.readPendingChars()
635 # Strip out all but the last incomplete line:
636 got = string.splitfields(got, '\n')[-1]
637 if got == 'ph> ': return # Ok. ===>
639 raise SubprocessError, ('ph not responding within %s secs' %
642 #############################################################################
644 #############################################################################
647 print "\tOpening subprocess:"
648 p = Subprocess('cat', 1) # set to expire noisily...
650 print "\tOpening bogus subprocess, should fail:"
652 b = Subprocess('/', 1)
653 print "\tOops! Null-named subprocess startup *succeeded*?!?"
654 except SubprocessError:
655 print "\t...yep, it failed."
656 print '\tWrite, then read, two newline-teriminated lines, using readline:'
657 p.write('first full line written\n'); p.write('second.\n')
660 print '\tThree lines, last sans newline, read using combination:'
661 p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
662 print '\tFirst line via readline:'
664 print '\tRest via readPendingChars:'
665 print p.readPendingChars()
666 print "\tStopping then continuing subprocess (verbose):"
667 if not p.stop(1): # verbose stop
668 print '\t** Stop seems to have failed!'
670 print '\tWriting line while subprocess is paused...'
671 p.write('written while subprocess paused\n')
672 print '\tNonblocking read of paused subprocess (should be empty):'
673 print p.readPendingChars()
674 print '\tContinuing subprocess (verbose):'
675 if not p.cont(1): # verbose continue
676 print '\t** Continue seems to have failed! Probly lost subproc...'
679 print '\tReading accumulated line, blocking read:'
681 print "\tDeleting subproc, which was set to die noisily:"