X-Git-Url: https://git.phdru.name/?a=blobdiff_plain;f=subproc.py;fp=subproc.py;h=39f82a63243cb628afc177f69f325c11d99f3d7c;hb=3f8cd2467fcee643fd4ece8dac9f5cdf70f905c5;hp=0000000000000000000000000000000000000000;hpb=e479d3793f93442f6ae002a6127d8c15ae8c5d27;p=bookmarks_db.git diff --git a/subproc.py b/subproc.py new file mode 100644 index 0000000..39f82a6 --- /dev/null +++ b/subproc.py @@ -0,0 +1,694 @@ +"""Run a subprocess and communicate with it via stdin, stdout, and stderr. + +Requires that platform supports, eg, posix-style os.pipe and os.fork. + +Subprocess class features: + + - provides non-blocking stdin and stderr reads + + - provides subprocess stop and continue, kill-on-deletion + + - provides detection of subprocess startup failure + + - Subprocess objects have nice, informative string rep (as every good object + ought). + + - RecordFile class provides record-oriented IO for file-like stream objects. +""" + +__version__ = "Revision: 1.7 " + +# Id: subproc.py,v 1.7 1998 +# Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995. + +# Prior art: Initially based python code examples demonstrating usage of pipes +# and subprocesses, primarily one by jose pereira. + +# Implementation notes: +# - I'm not using the fcntl module to implement non-blocking file descriptors, +# because i don't know what all in it is portable and what is not. I'm not +# about to provide for different platform contingencies - at that extent, the +# effort would be better spent hacking 'expect' into python. +# - Todo? - Incorporate an error-output handler approach, where error output is +# checked on regular IO, when a handler is defined, and passed to the +# handler (eg for printing) immediately as it shows... +# - Detection of failed subprocess startup is a gross kludge, at present. + +# - new additions (1.3, 1.4): +# - Readbuf, taken from donn cave's iobuf addition, implements non-blocking +# reads based solely on os.read with select, while capitalizing big-time on +# multi-char read chunking. +# - Subproc deletion frees up pipe file descriptors, so they're not exhausted. +# +# ken.manheimer@nist.gov + + +import sys, os, string, time, types +import select +import signal + + +SubprocessError = 'SubprocessError' +# You may need to increase execvp_grace_seconds, if you have a large or slow +# path to search: +execvp_grace_seconds = 0.5 + +class Subprocess: + """Run and communicate asynchronously with a subprocess. + + Provides non-blocking reads in the form of .readPendingChars and + .readPendingLine. + + .readline will block until it gets a complete line. + + .peekPendingChar does a non-blocking, non-consuming read for pending + output, and can be used before .readline to check non-destructively for + pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until + a new character is pending, or timeout secs pass, with granularity of + pollPause seconds. + + There are corresponding read and peekPendingErrXXX routines, to read from + the subprocess stderr stream.""" + + pid = 0 + cmd = '' + expire_noisily = 1 # Announce subproc destruction? + pipefiles = [] + readbuf = 0 # fork will assign to be a readbuf obj + errbuf = 0 # fork will assign to be a readbuf obj + + def __init__(self, cmd, control_stderr=0, expire_noisily=0, + in_fd=0, out_fd=1, err_fd=2): + """Launch a subprocess, given command string COMMAND.""" + self.cmd = cmd + self.pid = 0 + self.expire_noisily = expire_noisily + self.control_stderr = control_stderr + self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd + self.fork() + + def fork(self, cmd=None): + """Fork a subprocess with designated COMMAND (default, self.cmd).""" + if cmd: self.cmd = cmd + else: cmd = self.cmd + pRc, cWp = os.pipe() # parent-read-child, child-write-parent + cRp, pWc = os.pipe() # child-read-parent, parent-write-child + pRe, cWe = os.pipe() # parent-read-error, child-write-error + self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe] + + self.pid = os.fork() + + if self.pid == 0: #### CHILD #### + parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr + # Reopen stdin, out, err, on pipe ends: + os.dup2(cRp, self.in_fd) # cRp = sys.stdin + os.dup2(cWp, self.out_fd) # cWp = sys.stdout + if self.control_stderr: + os.dup2(cWe, self.err_fd) # cWe = sys.stderr + # Ensure (within reason) stray file descriptors are closed: + excludes = [self.in_fd, self.out_fd, self.err_fd] + for i in range(4,100): + if i not in excludes: + try: os.close(i) + except os.error: pass + + self.run_cmd(cmd) + os._exit(1) # Shouldn't get here. + + else: ### PARENT ### + # Connect to the child's file descriptors, using our customized + # fdopen: + self.toChild = os.fdopen(pWc, 'w') + self.toChild_fdlist = [pWc] + self.readbuf = ReadBuf(pRc) + self.errbuf = ReadBuf(pRe) + time.sleep(execvp_grace_seconds) + try: + pid, err = os.waitpid(self.pid, os.WNOHANG) + except os.error, (errno, msg): + if errno == 10: + raise SubprocessError, \ + "Subprocess '%s' failed." % self.cmd + raise SubprocessError, \ + "Subprocess failed[%d]: %s" % (errno, msg) + if pid == self.pid: + # child exited already + self.pid == None + sig = err & 0xff + rc = (err & 0xff00) >> 8 + if sig: + raise SubprocessError, ( + "child killed by signal %d with a return code of %d" + % (sig, rc)) + if rc: + raise SubprocessError, \ + "child exited with return code %d" % rc + # Child may have exited, but not in error, so we won't say + # anything more at this point. + + def run_cmd(self, cmd): + cmd = string.split(self.cmd) + + try: + os.execvp(cmd[0], cmd) + os._exit(1) # Shouldn't get here + + except os.error, e: + if self.control_stderr: + os.dup2(parentErr, 2) # Reconnect to parent's stdout + sys.stderr.write("**execvp failed, '%s'**\n" % + str(e)) + os._exit(1) + + + ### Write input to subprocess ### + + def write(self, str): + """Write a STRING to the subprocess.""" + + if not self.pid: + raise SubprocessError, "no child" # ===> + if select.select([],self.toChild_fdlist,[],0)[1]: + self.toChild.write(str) + self.toChild.flush() + else: + # XXX Can write-buffer full be handled better?? + raise IOError, "write to %s blocked" % self # ===> + + def writeline(self, line=''): + """Write STRING, with added newline termination, to subprocess.""" + self.write(line + '\n') + + ### Get output from subprocess ### + + def peekPendingChar(self): + """Return, but (effectively) do not consume a single pending output + char, or return null string if none pending.""" + + return self.readbuf.peekPendingChar() # ===> + def peekPendingErrChar(self): + """Return, but (effectively) do not consume a single pending output + char, or return null string if none pending.""" + + return self.errbuf.peekPendingChar() # ===> + + def waitForPendingChar(self, timeout, pollPause=.1): + """Block max TIMEOUT secs until we peek a pending char, returning the + char, or '' if none encountered. + + Pause POLLPAUSE secs (default .1) between polls.""" + + accume = 0 + while 1: + nextChar = self.readbuf.peekPendingChar() + if nextChar or (accume > timeout): return nextChar + time.sleep(pollPause) + accume = accume + pollPause + + def read(self, n=None): + """Read N chars, or all pending if no N specified.""" + if n == None: + return self.readPendingChars() + got = '' + while n: + got0 = self.readPendingChars(n) + got = got + got0 + n = n - len(got0) + return got + def readPendingChars(self, max=None): + """Read all currently pending subprocess output as a single string.""" + return self.readbuf.readPendingChars(max) + def readPendingErrChars(self): + """Read all currently pending subprocess error output as a single + string.""" + if self.control_stderr: + return self.errbuf.readPendingChars() + else: + raise SubprocessError, "Haven't grabbed subprocess error stream." + + def readPendingLine(self): + """Read currently pending subprocess output, up to a complete line + (newline inclusive).""" + return self.readbuf.readPendingLine() + def readPendingErrLine(self): + """Read currently pending subprocess error output, up to a complete + line (newline inclusive).""" + if self.control_stderr: + return self.errbuf.readPendingLine() + else: + raise SubprocessError, "Haven't grabbed subprocess error stream." + + def readline(self): + """Return next complete line of subprocess output, blocking until + then.""" + return self.readbuf.readline() + def readlineErr(self): + """Return next complete line of subprocess error output, blocking until + then.""" + if self.control_stderr: + return self.errbuf.readline() + else: + raise SubprocessError, "Haven't grabbed subprocess error stream." + + ### Subprocess Control ### + + def active(self): + """True if subprocess is alive and kicking.""" + return self.status(boolean=1) + def status(self, boolean=0): + """Return string indicating whether process is alive or dead.""" + active = 0 + if not self.cmd: + status = 'sans command' + elif not self.pid: + status = 'sans process' + elif not self.cont(): + status = "(unresponding) '%s'" % self.cmd + else: + status = "'%s'" % self.cmd + active = 1 + if boolean: + return active + else: + return status + + def stop(self, verbose=1): + """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0 + otherwise.""" + try: + os.kill(self.pid, signal.SIGSTOP) + except os.error: + if verbose: + print "Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value) + return 0 + if verbose: print "Stopped '%s'" % self.cmd + return 'stopped' + + def cont(self, verbose=0): + """Signal subprocess with CONT (19), returning 'continued' if ok, or 0 + otherwise.""" + try: + os.kill(self.pid, signal.SIGCONT) + except os.error: + if verbose: + print ("Continue failed for '%s' - '%s'" % + (self.cmd, sys.exc_value)) + return 0 + if verbose: print "Continued '%s'" % self.cmd + return 'continued' + + def die(self): + """Send process PID signal SIG (default 9, 'kill'), returning None once + it is successfully reaped. + + SubprocessError is raised if process is not successfully killed.""" + + if not self.pid: + raise SubprocessError, "No process" # ===> + elif not self.cont(): + raise SubprocessError, "Can't signal subproc %s" % self # ===> + + # Try sending first a TERM and then a KILL signal. + keep_trying = 1 + sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)] + for sig in sigs: + try: + os.kill(self.pid, sig[1]) + except posix.error: + # keep trying + pass + # Try a couple or three times to reap the process with waitpid: + for i in range(5): + # WNOHANG == 1 on sunos, presumably same elsewhere. + if os.waitpid(self.pid, os.WNOHANG): + if self.expire_noisily: + print ("\n(%s subproc %d '%s' / %s)" % + (sig[0], self.pid, self.cmd, + hex(id(self))[2:])) + for i in self.pipefiles: + os.close(i) + self.pid = 0 + return None # ===> + time.sleep(.1) + # Only got here if subprocess is not gone: + raise (SubprocessError, + ("Failed kill of subproc %d, '%s', with signals %s" % + (self.pid, self.cmd, map(lambda(x): x[0], sigs)))) + + def __del__(self): + """Terminate the subprocess""" + if self.pid: + self.die() + + def __repr__(self): + status = self.status() + return '' + +# The name of the class is a pun; it is short for "Process", but it also appeals +# to the word "Procedure" +class Subproc(Subprocess): + def run_cmd(self, cmd): + apply(cmd[0], cmd[1:]) + os._exit(1) + +############################################################################# +##### Non-blocking read operations ##### +############################################################################# + +class ReadBuf: + """Output buffer for non-blocking reads on selectable files like pipes and + sockets. Init with a file descriptor for the file.""" + + def __init__(self, fd, maxChunkSize=1024): + """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE + (default 1024).""" + + if fd < 0: + raise ValueError + self.fd = fd + self.eof = 0 # May be set with stuff still in .buf + self.buf = '' + self.chunkSize = maxChunkSize # Biggest read chunk, default 1024. + + def fileno(self): + return self.fd + + def peekPendingChar(self): + """Return, but don't consume, first character of unconsumed output from + file, or empty string if none.""" + + if self.buf: + return self.buf[0] # ===> + + if self.eof: + return '' # ===> + + sel = select.select([self.fd], [], [self.fd], 0) + if sel[2]: + self.eof = 1 + if sel[0]: + self.buf = os.read(self.fd, self.chunkSize) # ===> + return self.buf[0] # Assume select don't lie. + else: return '' # ===> + + + def readPendingChar(self): + """Consume first character of unconsumed output from file, or empty + string if none.""" + + if self.buf: + got, self.buf = self.buf[0], self.buf[1:] + return got # ===> + + if self.eof: + return '' # ===> + + sel = select.select([self.fd], [], [self.fd], 0) + if sel[2]: + self.eof = 1 + if sel[0]: + return os.read(self.fd, 1) # ===> + else: return '' # ===> + + def readPendingChars(self, max=None): + """Consume uncomsumed output from FILE, or empty string if nothing + pending.""" + + got = "" + if self.buf: + got, self.buf = self.buf, '' + return got # ===> + + if self.eof: + return '' + + sel = select.select([self.fd], [], [self.fd], 0) + if sel[2]: + self.eof = 1 + if sel[0]: + got = got + os.read(self.fd, self.chunkSize) + if max == 0: + self.buf = got + return '' + elif max == None: + return got + elif len(got) > max: + self.buf = self.buf + got[max:] + return got[:max] + else: + return got + else: return '' + + def readPendingLine(self, block=0): + """Return pending output from FILE, up to first newline (inclusive). + + Does not block unless optional arg BLOCK is true. + + Note that an error will be raised if a new eof is encountered without + any newline.""" + + if self.buf: + to = string.find(self.buf, '\n') + if to != -1: + got, self.buf = self.buf[:to+1], self.buf[to+1:] + return got # ===> + got, self.buf = self.buf, '' + else: + if self.eof: + return '' # ===> + got = '' + + # Herein, 'got' contains the (former) contents of the buffer, and it + # doesn't include a newline. + fdlist = [self.fd] + period = block and 1.0 or 0 # don't be too busy while waiting + while 1: # (we'll only loop if block set) + sel = select.select(fdlist, [], fdlist, period) + if sel[2]: + self.eof = 1 + if sel[0]: + got = got + os.read(self.fd, self.chunkSize) + + to = string.find(got, '\n') + if to != -1: + got, self.buf = got[:to+1], got[to+1:] + return got # ===> + if not block: + return got # ===> + if self.eof: + self.buf = '' # this is how an ordinary file acts... + return got + # otherwise - no newline, blocking requested, no eof - loop. # ==^ + + def readline(self): + """Return next output line from file, blocking until it is received.""" + + return self.readPendingLine(1) # ===> + + +############################################################################# +##### Encapsulated reading and writing ##### +############################################################################# +# Encapsulate messages so the end can be unambiguously identified, even +# when they contain multiple, possibly empty lines. + +class RecordFile: + """Encapsulate stream object for record-oriented IO. + + Particularly useful when dealing with non-line oriented communications + over pipes, eg with subprocesses.""" + + # Message is written preceded by a line containing the message length. + + def __init__(self, f): + self.file = f + + def write_record(self, s): + "Write so self.read knows exactly how much to read." + f = self.__dict__['file'] + f.write("%s\n%s" % (len(s), s)) + if hasattr(f, 'flush'): + f.flush() + + def read_record(self): + "Read and reconstruct message as prepared by self.write." + f = self.__dict__['file'] + line = f.readline()[:-1] + if line: + try: + l = string.atoi(line) + except ValueError: + raise IOError, ("corrupt %s file structure" + % self.__class__.__name__) + return f.read(l) + else: + # EOF. + return '' + + def __getattr__(self, attr): + """Implement characteristic file object attributes.""" + f = self.__dict__['file'] + if hasattr(f, attr): + return getattr(f, attr) + else: + raise AttributeError, attr + + def __repr__(self): + return "<%s of %s at %s>" % (self.__class__.__name__, + self.__dict__['file'], + hex(id(self))[2:]) + +def record_trial(s): + """Exercise encapsulated write/read with an arbitrary string. + + Raise IOError if the string gets distorted through transmission!""" + from StringIO import StringIO + sf = StringIO() + c = RecordFile(sf) + c.write(s) + c.seek(0) + r = c.read() + show = " start:\t %s\n end:\t %s\n" % (`s`, `r`) + if r != s: + raise IOError, "String distorted:\n%s" % show + +############################################################################# +##### An example subprocess interfaces ##### +############################################################################# + +class Ph: + """Convenient interface to CCSO 'ph' nameserver subprocess. + + .query('string...') method takes a query and returns a list of dicts, each + of which represents one entry.""" + + # Note that i made this a class that handles a subprocess object, rather + # than one that inherits from it. I didn't see any functional + # disadvantages, and didn't think that full support of the entire + # Subprocess functionality was in any way suitable for interaction with + # this specialized interface. ? klm 13-Jan-1995 + + def __init__(self): + try: + self.proc = Subprocess('ph', 1) + except: + raise SubprocessError, ('failure starting ph: %s' % # ===> + str(sys.exc_value)) + + def query(self, q): + """Send a query and return a list of dicts for responses. + + Raise a ValueError if ph responds with an error.""" + + self.clear() + + self.proc.writeline('query ' + q) + got = []; it = {} + while 1: + response = self.getreply() # Should get null on new prompt. + errs = self.proc.readPendingErrChars() + if errs: + sys.stderr.write(errs) + if it: + got.append(it) + it = {} + if not response: + return got # ===> + elif type(response) == types.StringType: + raise ValueError, "ph failed match: '%s'" % response # ===> + for line in response: + # convert to a dict: + line = string.splitfields(line, ':') + it[string.strip(line[0])] = ( + string.strip(string.join(line[1:]))) + + def getreply(self): + """Consume next response from ph, returning list of lines or string + err.""" + # Key on first char: (First line may lack newline.) + # - dash discard line + # - 'ph> ' conclusion of response + # - number error message + # - whitespace beginning of next response + + nextChar = self.proc.waitForPendingChar(60) + if not nextChar: + raise SubprocessError, 'ph subprocess not responding' # ===> + elif nextChar == '-': + # dashed line - discard it, and continue reading: + self.proc.readline() + return self.getreply() # ===> + elif nextChar == 'p': + # 'ph> ' prompt - don't think we should hit this, but what the hay: + return '' # ===> + elif nextChar in '0123456789': + # Error notice - we're currently assuming single line errors: + return self.proc.readline()[:-1] # ===> + elif nextChar in ' \t': + # Get content, up to next dashed line: + got = [] + while nextChar != '-' and nextChar != '': + got.append(self.proc.readline()[:-1]) + nextChar = self.proc.peekPendingChar() + return got + def __repr__(self): + return "\n" % (self.proc.status(), + hex(id(self))[2:]) + def clear(self): + """Clear-out initial preface or residual subproc input and output.""" + pause = .5; maxIter = 10 # 5 seconds to clear + iterations = 0 + got = '' + self.proc.write('') + while iterations < maxIter: + got = got + self.proc.readPendingChars() + # Strip out all but the last incomplete line: + got = string.splitfields(got, '\n')[-1] + if got == 'ph> ': return # Ok. ===> + time.sleep(pause) + raise SubprocessError, ('ph not responding within %s secs' % + pause * maxIter) + +############################################################################# +##### Test ##### +############################################################################# + +def test(p=0): + print "\tOpening subprocess:" + p = Subprocess('cat', 1) # set to expire noisily... + print p + print "\tOpening bogus subprocess, should fail:" + try: + b = Subprocess('/', 1) + print "\tOops! Null-named subprocess startup *succeeded*?!?" + except SubprocessError: + print "\t...yep, it failed." + print '\tWrite, then read, two newline-teriminated lines, using readline:' + p.write('first full line written\n'); p.write('second.\n') + print `p.readline()` + print `p.readline()` + print '\tThree lines, last sans newline, read using combination:' + p.write('first\n'); p.write('second\n'); p.write('third, (no cr)') + print '\tFirst line via readline:' + print `p.readline()` + print '\tRest via readPendingChars:' + print p.readPendingChars() + print "\tStopping then continuing subprocess (verbose):" + if not p.stop(1): # verbose stop + print '\t** Stop seems to have failed!' + else: + print '\tWriting line while subprocess is paused...' + p.write('written while subprocess paused\n') + print '\tNonblocking read of paused subprocess (should be empty):' + print p.readPendingChars() + print '\tContinuing subprocess (verbose):' + if not p.cont(1): # verbose continue + print '\t** Continue seems to have failed! Probly lost subproc...' + return p + else: + print '\tReading accumulated line, blocking read:' + print p.readline() + print "\tDeleting subproc, which was set to die noisily:" + del p + print "\tDone." + return None