--- /dev/null
+"""Run a subprocess and communicate with it via stdin, stdout, and stderr.
+
+Requires that platform supports, eg, posix-style os.pipe and os.fork.
+
+Subprocess class features:
+
+ - provides non-blocking stdin and stderr reads
+
+ - provides subprocess stop and continue, kill-on-deletion
+
+ - provides detection of subprocess startup failure
+
+ - Subprocess objects have nice, informative string rep (as every good object
+ ought).
+
+ - RecordFile class provides record-oriented IO for file-like stream objects.
+"""
+
+__version__ = "Revision: 1.7 "
+
+# Id: subproc.py,v 1.7 1998
+# Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
+
+# Prior art: Initially based python code examples demonstrating usage of pipes
+# and subprocesses, primarily one by jose pereira.
+
+# Implementation notes:
+# - I'm not using the fcntl module to implement non-blocking file descriptors,
+# because i don't know what all in it is portable and what is not. I'm not
+# about to provide for different platform contingencies - at that extent, the
+# effort would be better spent hacking 'expect' into python.
+# - Todo? - Incorporate an error-output handler approach, where error output is
+# checked on regular IO, when a handler is defined, and passed to the
+# handler (eg for printing) immediately as it shows...
+# - Detection of failed subprocess startup is a gross kludge, at present.
+
+# - new additions (1.3, 1.4):
+# - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
+# reads based solely on os.read with select, while capitalizing big-time on
+# multi-char read chunking.
+# - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
+#
+# ken.manheimer@nist.gov
+
+
+import sys, os, string, time, types
+import select
+import signal
+
+
+SubprocessError = 'SubprocessError'
+# You may need to increase execvp_grace_seconds, if you have a large or slow
+# path to search:
+execvp_grace_seconds = 0.5
+
+class Subprocess:
+ """Run and communicate asynchronously with a subprocess.
+
+ Provides non-blocking reads in the form of .readPendingChars and
+ .readPendingLine.
+
+ .readline will block until it gets a complete line.
+
+ .peekPendingChar does a non-blocking, non-consuming read for pending
+ output, and can be used before .readline to check non-destructively for
+ pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until
+ a new character is pending, or timeout secs pass, with granularity of
+ pollPause seconds.
+
+ There are corresponding read and peekPendingErrXXX routines, to read from
+ the subprocess stderr stream."""
+
+ pid = 0
+ cmd = ''
+ expire_noisily = 1 # Announce subproc destruction?
+ pipefiles = []
+ readbuf = 0 # fork will assign to be a readbuf obj
+ errbuf = 0 # fork will assign to be a readbuf obj
+
+ def __init__(self, cmd, control_stderr=0, expire_noisily=0,
+ in_fd=0, out_fd=1, err_fd=2):
+ """Launch a subprocess, given command string COMMAND."""
+ self.cmd = cmd
+ self.pid = 0
+ self.expire_noisily = expire_noisily
+ self.control_stderr = control_stderr
+ self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
+ self.fork()
+
+ def fork(self, cmd=None):
+ """Fork a subprocess with designated COMMAND (default, self.cmd)."""
+ if cmd: self.cmd = cmd
+ else: cmd = self.cmd
+ pRc, cWp = os.pipe() # parent-read-child, child-write-parent
+ cRp, pWc = os.pipe() # child-read-parent, parent-write-child
+ pRe, cWe = os.pipe() # parent-read-error, child-write-error
+ self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
+
+ self.pid = os.fork()
+
+ if self.pid == 0: #### CHILD ####
+ parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
+ # Reopen stdin, out, err, on pipe ends:
+ os.dup2(cRp, self.in_fd) # cRp = sys.stdin
+ os.dup2(cWp, self.out_fd) # cWp = sys.stdout
+ if self.control_stderr:
+ os.dup2(cWe, self.err_fd) # cWe = sys.stderr
+ # Ensure (within reason) stray file descriptors are closed:
+ excludes = [self.in_fd, self.out_fd, self.err_fd]
+ for i in range(4,100):
+ if i not in excludes:
+ try: os.close(i)
+ except os.error: pass
+
+ self.run_cmd(cmd)
+ os._exit(1) # Shouldn't get here.
+
+ else: ### PARENT ###
+ # Connect to the child's file descriptors, using our customized
+ # fdopen:
+ self.toChild = os.fdopen(pWc, 'w')
+ self.toChild_fdlist = [pWc]
+ self.readbuf = ReadBuf(pRc)
+ self.errbuf = ReadBuf(pRe)
+ time.sleep(execvp_grace_seconds)
+ try:
+ pid, err = os.waitpid(self.pid, os.WNOHANG)
+ except os.error, (errno, msg):
+ if errno == 10:
+ raise SubprocessError, \
+ "Subprocess '%s' failed." % self.cmd
+ raise SubprocessError, \
+ "Subprocess failed[%d]: %s" % (errno, msg)
+ if pid == self.pid:
+ # child exited already
+ self.pid == None
+ sig = err & 0xff
+ rc = (err & 0xff00) >> 8
+ if sig:
+ raise SubprocessError, (
+ "child killed by signal %d with a return code of %d"
+ % (sig, rc))
+ if rc:
+ raise SubprocessError, \
+ "child exited with return code %d" % rc
+ # Child may have exited, but not in error, so we won't say
+ # anything more at this point.
+
+ def run_cmd(self, cmd):
+ cmd = string.split(self.cmd)
+
+ try:
+ os.execvp(cmd[0], cmd)
+ os._exit(1) # Shouldn't get here
+
+ except os.error, e:
+ if self.control_stderr:
+ os.dup2(parentErr, 2) # Reconnect to parent's stdout
+ sys.stderr.write("**execvp failed, '%s'**\n" %
+ str(e))
+ os._exit(1)
+
+
+ ### Write input to subprocess ###
+
+ def write(self, str):
+ """Write a STRING to the subprocess."""
+
+ if not self.pid:
+ raise SubprocessError, "no child" # ===>
+ if select.select([],self.toChild_fdlist,[],0)[1]:
+ self.toChild.write(str)
+ self.toChild.flush()
+ else:
+ # XXX Can write-buffer full be handled better??
+ raise IOError, "write to %s blocked" % self # ===>
+
+ def writeline(self, line=''):
+ """Write STRING, with added newline termination, to subprocess."""
+ self.write(line + '\n')
+
+ ### Get output from subprocess ###
+
+ def peekPendingChar(self):
+ """Return, but (effectively) do not consume a single pending output
+ char, or return null string if none pending."""
+
+ return self.readbuf.peekPendingChar() # ===>
+ def peekPendingErrChar(self):
+ """Return, but (effectively) do not consume a single pending output
+ char, or return null string if none pending."""
+
+ return self.errbuf.peekPendingChar() # ===>
+
+ def waitForPendingChar(self, timeout, pollPause=.1):
+ """Block max TIMEOUT secs until we peek a pending char, returning the
+ char, or '' if none encountered.
+
+ Pause POLLPAUSE secs (default .1) between polls."""
+
+ accume = 0
+ while 1:
+ nextChar = self.readbuf.peekPendingChar()
+ if nextChar or (accume > timeout): return nextChar
+ time.sleep(pollPause)
+ accume = accume + pollPause
+
+ def read(self, n=None):
+ """Read N chars, or all pending if no N specified."""
+ if n == None:
+ return self.readPendingChars()
+ got = ''
+ while n:
+ got0 = self.readPendingChars(n)
+ got = got + got0
+ n = n - len(got0)
+ return got
+ def readPendingChars(self, max=None):
+ """Read all currently pending subprocess output as a single string."""
+ return self.readbuf.readPendingChars(max)
+ def readPendingErrChars(self):
+ """Read all currently pending subprocess error output as a single
+ string."""
+ if self.control_stderr:
+ return self.errbuf.readPendingChars()
+ else:
+ raise SubprocessError, "Haven't grabbed subprocess error stream."
+
+ def readPendingLine(self):
+ """Read currently pending subprocess output, up to a complete line
+ (newline inclusive)."""
+ return self.readbuf.readPendingLine()
+ def readPendingErrLine(self):
+ """Read currently pending subprocess error output, up to a complete
+ line (newline inclusive)."""
+ if self.control_stderr:
+ return self.errbuf.readPendingLine()
+ else:
+ raise SubprocessError, "Haven't grabbed subprocess error stream."
+
+ def readline(self):
+ """Return next complete line of subprocess output, blocking until
+ then."""
+ return self.readbuf.readline()
+ def readlineErr(self):
+ """Return next complete line of subprocess error output, blocking until
+ then."""
+ if self.control_stderr:
+ return self.errbuf.readline()
+ else:
+ raise SubprocessError, "Haven't grabbed subprocess error stream."
+
+ ### Subprocess Control ###
+
+ def active(self):
+ """True if subprocess is alive and kicking."""
+ return self.status(boolean=1)
+ def status(self, boolean=0):
+ """Return string indicating whether process is alive or dead."""
+ active = 0
+ if not self.cmd:
+ status = 'sans command'
+ elif not self.pid:
+ status = 'sans process'
+ elif not self.cont():
+ status = "(unresponding) '%s'" % self.cmd
+ else:
+ status = "'%s'" % self.cmd
+ active = 1
+ if boolean:
+ return active
+ else:
+ return status
+
+ def stop(self, verbose=1):
+ """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
+ otherwise."""
+ try:
+ os.kill(self.pid, signal.SIGSTOP)
+ except os.error:
+ if verbose:
+ print "Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value)
+ return 0
+ if verbose: print "Stopped '%s'" % self.cmd
+ return 'stopped'
+
+ def cont(self, verbose=0):
+ """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
+ otherwise."""
+ try:
+ os.kill(self.pid, signal.SIGCONT)
+ except os.error:
+ if verbose:
+ print ("Continue failed for '%s' - '%s'" %
+ (self.cmd, sys.exc_value))
+ return 0
+ if verbose: print "Continued '%s'" % self.cmd
+ return 'continued'
+
+ def die(self):
+ """Send process PID signal SIG (default 9, 'kill'), returning None once
+ it is successfully reaped.
+
+ SubprocessError is raised if process is not successfully killed."""
+
+ if not self.pid:
+ raise SubprocessError, "No process" # ===>
+ elif not self.cont():
+ raise SubprocessError, "Can't signal subproc %s" % self # ===>
+
+ # Try sending first a TERM and then a KILL signal.
+ keep_trying = 1
+ sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
+ for sig in sigs:
+ try:
+ os.kill(self.pid, sig[1])
+ except posix.error:
+ # keep trying
+ pass
+ # Try a couple or three times to reap the process with waitpid:
+ for i in range(5):
+ # WNOHANG == 1 on sunos, presumably same elsewhere.
+ if os.waitpid(self.pid, os.WNOHANG):
+ if self.expire_noisily:
+ print ("\n(%s subproc %d '%s' / %s)" %
+ (sig[0], self.pid, self.cmd,
+ hex(id(self))[2:]))
+ for i in self.pipefiles:
+ os.close(i)
+ self.pid = 0
+ return None # ===>
+ time.sleep(.1)
+ # Only got here if subprocess is not gone:
+ raise (SubprocessError,
+ ("Failed kill of subproc %d, '%s', with signals %s" %
+ (self.pid, self.cmd, map(lambda(x): x[0], sigs))))
+
+ def __del__(self):
+ """Terminate the subprocess"""
+ if self.pid:
+ self.die()
+
+ def __repr__(self):
+ status = self.status()
+ return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
+
+# The name of the class is a pun; it is short for "Process", but it also appeals
+# to the word "Procedure"
+class Subproc(Subprocess):
+ def run_cmd(self, cmd):
+ apply(cmd[0], cmd[1:])
+ os._exit(1)
+
+#############################################################################
+##### Non-blocking read operations #####
+#############################################################################
+
+class ReadBuf:
+ """Output buffer for non-blocking reads on selectable files like pipes and
+ sockets. Init with a file descriptor for the file."""
+
+ def __init__(self, fd, maxChunkSize=1024):
+ """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
+ (default 1024)."""
+
+ if fd < 0:
+ raise ValueError
+ self.fd = fd
+ self.eof = 0 # May be set with stuff still in .buf
+ self.buf = ''
+ self.chunkSize = maxChunkSize # Biggest read chunk, default 1024.
+
+ def fileno(self):
+ return self.fd
+
+ def peekPendingChar(self):
+ """Return, but don't consume, first character of unconsumed output from
+ file, or empty string if none."""
+
+ if self.buf:
+ return self.buf[0] # ===>
+
+ if self.eof:
+ return '' # ===>
+
+ sel = select.select([self.fd], [], [self.fd], 0)
+ if sel[2]:
+ self.eof = 1
+ if sel[0]:
+ self.buf = os.read(self.fd, self.chunkSize) # ===>
+ return self.buf[0] # Assume select don't lie.
+ else: return '' # ===>
+
+
+ def readPendingChar(self):
+ """Consume first character of unconsumed output from file, or empty
+ string if none."""
+
+ if self.buf:
+ got, self.buf = self.buf[0], self.buf[1:]
+ return got # ===>
+
+ if self.eof:
+ return '' # ===>
+
+ sel = select.select([self.fd], [], [self.fd], 0)
+ if sel[2]:
+ self.eof = 1
+ if sel[0]:
+ return os.read(self.fd, 1) # ===>
+ else: return '' # ===>
+
+ def readPendingChars(self, max=None):
+ """Consume uncomsumed output from FILE, or empty string if nothing
+ pending."""
+
+ got = ""
+ if self.buf:
+ got, self.buf = self.buf, ''
+ return got # ===>
+
+ if self.eof:
+ return ''
+
+ sel = select.select([self.fd], [], [self.fd], 0)
+ if sel[2]:
+ self.eof = 1
+ if sel[0]:
+ got = got + os.read(self.fd, self.chunkSize)
+ if max == 0:
+ self.buf = got
+ return ''
+ elif max == None:
+ return got
+ elif len(got) > max:
+ self.buf = self.buf + got[max:]
+ return got[:max]
+ else:
+ return got
+ else: return ''
+
+ def readPendingLine(self, block=0):
+ """Return pending output from FILE, up to first newline (inclusive).
+
+ Does not block unless optional arg BLOCK is true.
+
+ Note that an error will be raised if a new eof is encountered without
+ any newline."""
+
+ if self.buf:
+ to = string.find(self.buf, '\n')
+ if to != -1:
+ got, self.buf = self.buf[:to+1], self.buf[to+1:]
+ return got # ===>
+ got, self.buf = self.buf, ''
+ else:
+ if self.eof:
+ return '' # ===>
+ got = ''
+
+ # Herein, 'got' contains the (former) contents of the buffer, and it
+ # doesn't include a newline.
+ fdlist = [self.fd]
+ period = block and 1.0 or 0 # don't be too busy while waiting
+ while 1: # (we'll only loop if block set)
+ sel = select.select(fdlist, [], fdlist, period)
+ if sel[2]:
+ self.eof = 1
+ if sel[0]:
+ got = got + os.read(self.fd, self.chunkSize)
+
+ to = string.find(got, '\n')
+ if to != -1:
+ got, self.buf = got[:to+1], got[to+1:]
+ return got # ===>
+ if not block:
+ return got # ===>
+ if self.eof:
+ self.buf = '' # this is how an ordinary file acts...
+ return got
+ # otherwise - no newline, blocking requested, no eof - loop. # ==^
+
+ def readline(self):
+ """Return next output line from file, blocking until it is received."""
+
+ return self.readPendingLine(1) # ===>
+
+
+#############################################################################
+##### Encapsulated reading and writing #####
+#############################################################################
+# Encapsulate messages so the end can be unambiguously identified, even
+# when they contain multiple, possibly empty lines.
+
+class RecordFile:
+ """Encapsulate stream object for record-oriented IO.
+
+ Particularly useful when dealing with non-line oriented communications
+ over pipes, eg with subprocesses."""
+
+ # Message is written preceded by a line containing the message length.
+
+ def __init__(self, f):
+ self.file = f
+
+ def write_record(self, s):
+ "Write so self.read knows exactly how much to read."
+ f = self.__dict__['file']
+ f.write("%s\n%s" % (len(s), s))
+ if hasattr(f, 'flush'):
+ f.flush()
+
+ def read_record(self):
+ "Read and reconstruct message as prepared by self.write."
+ f = self.__dict__['file']
+ line = f.readline()[:-1]
+ if line:
+ try:
+ l = string.atoi(line)
+ except ValueError:
+ raise IOError, ("corrupt %s file structure"
+ % self.__class__.__name__)
+ return f.read(l)
+ else:
+ # EOF.
+ return ''
+
+ def __getattr__(self, attr):
+ """Implement characteristic file object attributes."""
+ f = self.__dict__['file']
+ if hasattr(f, attr):
+ return getattr(f, attr)
+ else:
+ raise AttributeError, attr
+
+ def __repr__(self):
+ return "<%s of %s at %s>" % (self.__class__.__name__,
+ self.__dict__['file'],
+ hex(id(self))[2:])
+
+def record_trial(s):
+ """Exercise encapsulated write/read with an arbitrary string.
+
+ Raise IOError if the string gets distorted through transmission!"""
+ from StringIO import StringIO
+ sf = StringIO()
+ c = RecordFile(sf)
+ c.write(s)
+ c.seek(0)
+ r = c.read()
+ show = " start:\t %s\n end:\t %s\n" % (`s`, `r`)
+ if r != s:
+ raise IOError, "String distorted:\n%s" % show
+
+#############################################################################
+##### An example subprocess interfaces #####
+#############################################################################
+
+class Ph:
+ """Convenient interface to CCSO 'ph' nameserver subprocess.
+
+ .query('string...') method takes a query and returns a list of dicts, each
+ of which represents one entry."""
+
+ # Note that i made this a class that handles a subprocess object, rather
+ # than one that inherits from it. I didn't see any functional
+ # disadvantages, and didn't think that full support of the entire
+ # Subprocess functionality was in any way suitable for interaction with
+ # this specialized interface. ? klm 13-Jan-1995
+
+ def __init__(self):
+ try:
+ self.proc = Subprocess('ph', 1)
+ except:
+ raise SubprocessError, ('failure starting ph: %s' % # ===>
+ str(sys.exc_value))
+
+ def query(self, q):
+ """Send a query and return a list of dicts for responses.
+
+ Raise a ValueError if ph responds with an error."""
+
+ self.clear()
+
+ self.proc.writeline('query ' + q)
+ got = []; it = {}
+ while 1:
+ response = self.getreply() # Should get null on new prompt.
+ errs = self.proc.readPendingErrChars()
+ if errs:
+ sys.stderr.write(errs)
+ if it:
+ got.append(it)
+ it = {}
+ if not response:
+ return got # ===>
+ elif type(response) == types.StringType:
+ raise ValueError, "ph failed match: '%s'" % response # ===>
+ for line in response:
+ # convert to a dict:
+ line = string.splitfields(line, ':')
+ it[string.strip(line[0])] = (
+ string.strip(string.join(line[1:])))
+
+ def getreply(self):
+ """Consume next response from ph, returning list of lines or string
+ err."""
+ # Key on first char: (First line may lack newline.)
+ # - dash discard line
+ # - 'ph> ' conclusion of response
+ # - number error message
+ # - whitespace beginning of next response
+
+ nextChar = self.proc.waitForPendingChar(60)
+ if not nextChar:
+ raise SubprocessError, 'ph subprocess not responding' # ===>
+ elif nextChar == '-':
+ # dashed line - discard it, and continue reading:
+ self.proc.readline()
+ return self.getreply() # ===>
+ elif nextChar == 'p':
+ # 'ph> ' prompt - don't think we should hit this, but what the hay:
+ return '' # ===>
+ elif nextChar in '0123456789':
+ # Error notice - we're currently assuming single line errors:
+ return self.proc.readline()[:-1] # ===>
+ elif nextChar in ' \t':
+ # Get content, up to next dashed line:
+ got = []
+ while nextChar != '-' and nextChar != '':
+ got.append(self.proc.readline()[:-1])
+ nextChar = self.proc.peekPendingChar()
+ return got
+ def __repr__(self):
+ return "<Ph instance, %s at %s>\n" % (self.proc.status(),
+ hex(id(self))[2:])
+ def clear(self):
+ """Clear-out initial preface or residual subproc input and output."""
+ pause = .5; maxIter = 10 # 5 seconds to clear
+ iterations = 0
+ got = ''
+ self.proc.write('')
+ while iterations < maxIter:
+ got = got + self.proc.readPendingChars()
+ # Strip out all but the last incomplete line:
+ got = string.splitfields(got, '\n')[-1]
+ if got == 'ph> ': return # Ok. ===>
+ time.sleep(pause)
+ raise SubprocessError, ('ph not responding within %s secs' %
+ pause * maxIter)
+
+#############################################################################
+##### Test #####
+#############################################################################
+
+def test(p=0):
+ print "\tOpening subprocess:"
+ p = Subprocess('cat', 1) # set to expire noisily...
+ print p
+ print "\tOpening bogus subprocess, should fail:"
+ try:
+ b = Subprocess('/', 1)
+ print "\tOops! Null-named subprocess startup *succeeded*?!?"
+ except SubprocessError:
+ print "\t...yep, it failed."
+ print '\tWrite, then read, two newline-teriminated lines, using readline:'
+ p.write('first full line written\n'); p.write('second.\n')
+ print `p.readline()`
+ print `p.readline()`
+ print '\tThree lines, last sans newline, read using combination:'
+ p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
+ print '\tFirst line via readline:'
+ print `p.readline()`
+ print '\tRest via readPendingChars:'
+ print p.readPendingChars()
+ print "\tStopping then continuing subprocess (verbose):"
+ if not p.stop(1): # verbose stop
+ print '\t** Stop seems to have failed!'
+ else:
+ print '\tWriting line while subprocess is paused...'
+ p.write('written while subprocess paused\n')
+ print '\tNonblocking read of paused subprocess (should be empty):'
+ print p.readPendingChars()
+ print '\tContinuing subprocess (verbose):'
+ if not p.cont(1): # verbose continue
+ print '\t** Continue seems to have failed! Probly lost subproc...'
+ return p
+ else:
+ print '\tReading accumulated line, blocking read:'
+ print p.readline()
+ print "\tDeleting subproc, which was set to die noisily:"
+ del p
+ print "\tDone."
+ return None