X-Git-Url: https://git.phdru.name/?p=bookmarks_db.git;a=blobdiff_plain;f=subproc.py;h=e3c2717da8a4207ba0a099e02d9278bebeda7dd0;hp=fcb7e1e222715bbea0408062dd73da92b5a13692;hb=HEAD;hpb=f61e98fb060b1702bc02a2dddbe8dd49443ec0b6 diff --git a/subproc.py b/subproc.py deleted file mode 100644 index fcb7e1e..0000000 --- a/subproc.py +++ /dev/null @@ -1,688 +0,0 @@ -"""Run a subprocess and communicate with it via stdin, stdout, and stderr. - -Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork' -routines. - -Subprocess class features: - - - provides non-blocking stdin and stderr reads - - - provides subprocess stop and continue, kill-on-deletion - - - provides detection of subprocess startup failure - - - Subprocess objects have nice, informative string rep (as every good object - ought).""" - -__version__ = "Revision: 1.15 " - -# Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp -# Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995. - -# Prior art: Initially based python code examples demonstrating usage of pipes -# and subprocesses, primarily one by jose pereira. - -# Implementation notes: -# - I'm not using the fcntl module to implement non-blocking file descriptors, -# because i don't know what all in it is portable and what is not. I'm not -# about to provide for different platform contingencies - at that extent, the -# effort would be better spent hacking 'expect' into python. -# - Todo? - Incorporate an error-output handler approach, where error output is -# checked on regular IO, when a handler is defined, and passed to the -# handler (eg for printing) immediately as it shows... -# - Detection of failed subprocess startup is a gross kludge, at present. - -# - new additions (1.3, 1.4): -# - Readbuf, taken from donn cave's iobuf addition, implements non-blocking -# reads based solely on os.read with select, while capitalizing big-time on -# multi-char read chunking. -# - Subproc deletion frees up pipe file descriptors, so they're not exhausted. -# -# ken.manheimer@nist.gov - -# This is a modified version by Oleg Broytman . -# The original version is still preserved at -# https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz - -import sys, os, string, time, types -import select -import signal - - -class SubprocessError(Exception): - pass - -# You may need to increase execvp_grace_seconds, if you have a large or slow -# path to search: -execvp_grace_seconds = 0.5 - -class Subprocess: - """Run and communicate asynchronously with a subprocess. - - Provides non-blocking reads in the form of .readPendingChars and - .readPendingLine. - - .readline will block until it gets a complete line. - - .peekPendingChar does a non-blocking, non-consuming read for pending - output, and can be used before .readline to check non-destructively for - pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until - a new character is pending, or timeout secs pass, with granularity of - pollPause seconds. - - There are corresponding read and peekPendingErrXXX routines, to read from - the subprocess stderr stream.""" - - pid = 0 - cmd = '' - expire_noisily = 1 # Announce subproc destruction? - pipefiles = [] - readbuf = 0 # fork will assign to be a readbuf obj - errbuf = 0 # fork will assign to be a readbuf obj - - def __init__(self, cmd, control_stderr=0, expire_noisily=0, - in_fd=0, out_fd=1, err_fd=2): - """Launch a subprocess, given command string COMMAND.""" - self.cmd = cmd - self.pid = 0 - self.expire_noisily = expire_noisily - self.control_stderr = control_stderr - self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd - self.fork() - - def fork(self, cmd=None): - """Fork a subprocess with designated COMMAND (default, self.cmd).""" - if cmd: self.cmd = cmd - else: cmd = self.cmd - cmd = string.split(self.cmd) - pRc, cWp = os.pipe() # parent-read-child, child-write-parent - cRp, pWc = os.pipe() # child-read-parent, parent-write-child - pRe, cWe = os.pipe() # parent-read-error, child-write-error - self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe] - - self.pid = os.fork() - - if self.pid == 0: #### CHILD #### - parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr - # Reopen stdin, out, err, on pipe ends: - os.dup2(cRp, self.in_fd) # cRp = sys.stdin - os.dup2(cWp, self.out_fd) # cWp = sys.stdout - if self.control_stderr: - os.dup2(cWe, self.err_fd) # cWe = sys.stderr - # Ensure (within reason) stray file descriptors are closed: - excludes = [self.in_fd, self.out_fd, self.err_fd] - for i in range(4,100): - if i not in excludes: - try: os.close(i) - except os.error: pass - - try: - os.execvp(cmd[0], cmd) - os._exit(1) # Shouldn't get here - - except os.error as e: - if self.control_stderr: - os.dup2(parentErr, 2) # Reconnect to parent's stdout - sys.stderr.write("**execvp failed, '%s'**\n" % - str(e)) - os._exit(1) - os._exit(1) # Shouldn't get here. - - else: ### PARENT ### - # Connect to the child's file descriptors, using our customized - # fdopen: - self.toChild = os.fdopen(pWc, 'w') - self.toChild_fdlist = [pWc] - self.readbuf = ReadBuf(pRc) - self.errbuf = ReadBuf(pRe) - time.sleep(execvp_grace_seconds) - try: - pid, err = os.waitpid(self.pid, os.WNOHANG) - except os.error as error: - errno, msg = error - if errno == 10: - raise SubprocessError("Subprocess '%s' failed." % self.cmd) - raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg)) - if pid == self.pid: - # child exited already - self.pid == None - sig = err & 0xff - rc = (err & 0xff00) >> 8 - if sig: - raise SubprocessError( - "child killed by signal %d with a return code of %d" - % (sig, rc)) - if rc: - raise SubprocessError( - "child exited with return code %d" % rc) - # Child may have exited, but not in error, so we won't say - # anything more at this point. - - ### Write input to subprocess ### - - def write(self, str): - """Write a STRING to the subprocess.""" - - if not self.pid: - raise SubprocessError("no child") # ===> - if select.select([],self.toChild_fdlist,[],0)[1]: - self.toChild.write(str) - self.toChild.flush() - else: - # XXX Can write-buffer full be handled better?? - raise IOError("write to %s blocked" % self) # ===> - - def writeline(self, line=''): - """Write STRING, with added newline termination, to subprocess.""" - self.write(line + '\n') - - ### Get output from subprocess ### - - def peekPendingChar(self): - """Return, but (effectively) do not consume a single pending output - char, or return null string if none pending.""" - - return self.readbuf.peekPendingChar() # ===> - def peekPendingErrChar(self): - """Return, but (effectively) do not consume a single pending output - char, or return null string if none pending.""" - - return self.errbuf.peekPendingChar() # ===> - - def waitForPendingChar(self, timeout, pollPause=.1): - """Block max TIMEOUT secs until we peek a pending char, returning the - char, or '' if none encountered. - - Pause POLLPAUSE secs (default .1) between polls.""" - - accume = 0 - while 1: - nextChar = self.readbuf.peekPendingChar() - if nextChar or (accume > timeout): return nextChar - time.sleep(pollPause) - accume = accume + pollPause - - def read(self, n=None): - """Read N chars, or all pending if no N specified.""" - if n == None: - return self.readPendingChars() - got = '' - while n: - got0 = self.readPendingChars(n) - got = got + got0 - n = n - len(got0) - return got - def readPendingChars(self, max=None): - """Read all currently pending subprocess output as a single string.""" - return self.readbuf.readPendingChars(max) - def readPendingErrChars(self): - """Read all currently pending subprocess error output as a single - string.""" - if self.control_stderr: - return self.errbuf.readPendingChars() - else: - raise SubprocessError("Haven't grabbed subprocess error stream.") - - def readPendingLine(self): - """Read currently pending subprocess output, up to a complete line - (newline inclusive).""" - return self.readbuf.readPendingLine() - def readPendingErrLine(self): - """Read currently pending subprocess error output, up to a complete - line (newline inclusive).""" - if self.control_stderr: - return self.errbuf.readPendingLine() - else: - raise SubprocessError("Haven't grabbed subprocess error stream.") - - def readline(self): - """Return next complete line of subprocess output, blocking until - then.""" - return self.readbuf.readline() - def readlineErr(self): - """Return next complete line of subprocess error output, blocking until - then.""" - if self.control_stderr: - return self.errbuf.readline() - else: - raise SubprocessError("Haven't grabbed subprocess error stream.") - - ### Subprocess Control ### - - def active(self): - """True if subprocess is alive and kicking.""" - return self.status(boolean=1) - def status(self, boolean=0): - """Return string indicating whether process is alive or dead.""" - active = 0 - if not self.cmd: - status = 'sans command' - elif not self.pid: - status = 'sans process' - elif not self.cont(): - status = "(unresponding) '%s'" % self.cmd - else: - status = "'%s'" % self.cmd - active = 1 - if boolean: - return active - else: - return status - - def stop(self, verbose=1): - """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0 - otherwise.""" - try: - os.kill(self.pid, signal.SIGSTOP) - except os.error: - if verbose: - print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value)) - return 0 - if verbose: print("Stopped '%s'" % self.cmd) - return 'stopped' - - def cont(self, verbose=0): - """Signal subprocess with CONT (19), returning 'continued' if ok, or 0 - otherwise.""" - try: - os.kill(self.pid, signal.SIGCONT) - except os.error: - if verbose: - print(("Continue failed for '%s' - '%s'" % - (self.cmd, sys.exc_value))) - return 0 - if verbose: print("Continued '%s'" % self.cmd) - return 'continued' - - def die(self): - """Send process PID signal SIG (default 9, 'kill'), returning None once - it is successfully reaped. - - SubprocessError is raised if process is not successfully killed.""" - - if not self.pid: - raise SubprocessError("No process") # ===> - elif not self.cont(): - raise SubprocessError("Can't signal subproc %s" % self) # ===> - - # Try sending first a TERM and then a KILL signal. - keep_trying = 1 - sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)] - for sig in sigs: - try: - os.kill(self.pid, sig[1]) - except posix.error: - # keep trying - pass - # Try a couple or three times to reap the process with waitpid: - for i in range(5): - # WNOHANG == 1 on sunos, presumably same elsewhere. - if os.waitpid(self.pid, os.WNOHANG): - if self.expire_noisily: - print(("\n(%s subproc %d '%s' / %s)" % - (sig[0], self.pid, self.cmd, - hex(id(self))[2:]))) - for i in self.pipefiles: - os.close(i) - self.pid = 0 - return None # ===> - time.sleep(.1) - # Only got here if subprocess is not gone: - raise SubprocessError( - "Failed kill of subproc %d, '%s', with signals %s" % - (self.pid, self.cmd, map(lambda x: x[0], sigs))) - - def __del__(self): - """Terminate the subprocess""" - if self.pid: - self.die() - - def __repr__(self): - status = self.status() - return '' - -############################################################################# -##### Non-blocking read operations ##### -############################################################################# - -class ReadBuf: - """Output buffer for non-blocking reads on selectable files like pipes and - sockets. Init with a file descriptor for the file.""" - - def __init__(self, fd, maxChunkSize=1024): - """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE - (default 1024).""" - - if fd < 0: - raise ValueError - self.fd = fd - self.eof = 0 # May be set with stuff still in .buf - self.buf = '' - self.chunkSize = maxChunkSize # Biggest read chunk, default 1024. - - def fileno(self): - return self.fd - - def peekPendingChar(self): - """Return, but don't consume, first character of unconsumed output from - file, or empty string if none.""" - - if self.buf: - return self.buf[0] # ===> - - if self.eof: - return '' # ===> - - sel = select.select([self.fd], [], [self.fd], 0) - if sel[2]: - self.eof = 1 - if sel[0]: - self.buf = os.read(self.fd, self.chunkSize) # ===> - return self.buf[0] # Assume select don't lie. - else: return '' # ===> - - - def readPendingChar(self): - """Consume first character of unconsumed output from file, or empty - string if none.""" - - if self.buf: - got, self.buf = self.buf[0], self.buf[1:] - return got # ===> - - if self.eof: - return '' # ===> - - sel = select.select([self.fd], [], [self.fd], 0) - if sel[2]: - self.eof = 1 - if sel[0]: - return os.read(self.fd, 1) # ===> - else: return '' # ===> - - def readPendingChars(self, max=None): - """Consume uncomsumed output from FILE, or empty string if nothing - pending.""" - - got = "" - if self.buf: - if (max > 0) and (len(self.buf) > max): - got = self.buf[0:max] - self.buf = self.buf[max:] - else: - got, self.buf = self.buf, '' - return got - - if self.eof: - return '' - - sel = select.select([self.fd], [], [self.fd], 0) - if sel[2]: - self.eof = 1 - if sel[0]: - got = got + os.read(self.fd, self.chunkSize) - if max == 0: - self.buf = got - return '' - elif max == None: - return got - elif len(got) > max: - self.buf = self.buf + got[max:] - return got[:max] - else: - return got - else: return '' - - def readPendingLine(self, block=0): - """Return pending output from FILE, up to first newline (inclusive). - - Does not block unless optional arg BLOCK is true. - - Note that an error will be raised if a new eof is encountered without - any newline.""" - - if self.buf: - to = string.find(self.buf, '\n') - if to != -1: - got, self.buf = self.buf[:to+1], self.buf[to+1:] - return got # ===> - got, self.buf = self.buf, '' - else: - if self.eof: - return '' # ===> - got = '' - - # Herein, 'got' contains the (former) contents of the buffer, and it - # doesn't include a newline. - fdlist = [self.fd] - period = block and 1.0 or 0 # don't be too busy while waiting - while 1: # (we'll only loop if block set) - sel = select.select(fdlist, [], fdlist, period) - if sel[2]: - self.eof = 1 - if sel[0]: - got = got + os.read(self.fd, self.chunkSize) - - to = string.find(got, '\n') - if to != -1: - got, self.buf = got[:to+1], got[to+1:] - return got # ===> - if not block: - return got # ===> - if self.eof: - self.buf = '' # this is how an ordinary file acts... - return got - # otherwise - no newline, blocking requested, no eof - loop. # ==^ - - def readline(self): - """Return next output line from file, blocking until it is received.""" - - return self.readPendingLine(1) # ===> - - -############################################################################# -##### Encapsulated reading and writing ##### -############################################################################# -# Encapsulate messages so the end can be unambiguously identified, even -# when they contain multiple, possibly empty lines. - -class RecordFile: - """Encapsulate stream object for record-oriented IO. - - Particularly useful when dealing with non-line oriented communications - over pipes, eg with subprocesses.""" - - # Message is written preceded by a line containing the message length. - - def __init__(self, f): - self.file = f - - def write_record(self, s): - "Write so self.read knows exactly how much to read." - f = self.__dict__['file'] - f.write("%s\n%s" % (len(s), s)) - if hasattr(f, 'flush'): - f.flush() - - def read_record(self): - "Read and reconstruct message as prepared by self.write." - f = self.__dict__['file'] - line = f.readline()[:-1] - if line: - try: - l = string.atoi(line) - except ValueError: - raise IOError(("corrupt %s file structure" - % self.__class__.__name__)) - return f.read(l) - else: - # EOF. - return '' - - def __getattr__(self, attr): - """Implement characteristic file object attributes.""" - f = self.__dict__['file'] - if hasattr(f, attr): - return getattr(f, attr) - else: - raise AttributeError(attr) - - def __repr__(self): - return "<%s of %s at %s>" % (self.__class__.__name__, - self.__dict__['file'], - hex(id(self))[2:]) - -def record_trial(s): - """Exercise encapsulated write/read with an arbitrary string. - - Raise IOError if the string gets distorted through transmission!""" - from StringIO import StringIO - sf = StringIO() - c = RecordFile(sf) - c.write(s) - c.seek(0) - r = c.read() - show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r)) - if r != s: - raise IOError("String distorted:\n%s" % show) - -############################################################################# -##### An example subprocess interfaces ##### -############################################################################# - -class Ph: - """Convenient interface to CCSO 'ph' nameserver subprocess. - - .query('string...') method takes a query and returns a list of dicts, each - of which represents one entry.""" - - # Note that i made this a class that handles a subprocess object, rather - # than one that inherits from it. I didn't see any functional - # disadvantages, and didn't think that full support of the entire - # Subprocess functionality was in any way suitable for interaction with - # this specialized interface. ? klm 13-Jan-1995 - - def __init__(self): - try: - self.proc = Subprocess('ph', 1) - except: - raise SubprocessError('failure starting ph: %s' % # ===> - str(sys.exc_value)) - - def query(self, q): - """Send a query and return a list of dicts for responses. - - Raise a ValueError if ph responds with an error.""" - - self.clear() - - self.proc.writeline('query ' + q) - got = []; it = {} - while 1: - response = self.getreply() # Should get null on new prompt. - errs = self.proc.readPendingErrChars() - if errs: - sys.stderr.write(errs) - if it: - got.append(it) - it = {} - if not response: - return got # ===> - elif type(response) == types.StringType: - raise ValueError("ph failed match: '%s'" % response) # ===> - for line in response: - # convert to a dict: - line = string.splitfields(line, ':') - it[string.strip(line[0])] = ( - string.strip(string.join(line[1:]))) - - def getreply(self): - """Consume next response from ph, returning list of lines or string - err.""" - # Key on first char: (First line may lack newline.) - # - dash discard line - # - 'ph> ' conclusion of response - # - number error message - # - whitespace beginning of next response - - nextChar = self.proc.waitForPendingChar(60) - if not nextChar: - raise SubprocessError('ph subprocess not responding') # ===> - elif nextChar == '-': - # dashed line - discard it, and continue reading: - self.proc.readline() - return self.getreply() # ===> - elif nextChar == 'p': - # 'ph> ' prompt - don't think we should hit this, but what the hay: - return '' # ===> - elif nextChar in '0123456789': - # Error notice - we're currently assuming single line errors: - return self.proc.readline()[:-1] # ===> - elif nextChar in ' \t': - # Get content, up to next dashed line: - got = [] - while nextChar != '-' and nextChar != '': - got.append(self.proc.readline()[:-1]) - nextChar = self.proc.peekPendingChar() - return got - def __repr__(self): - return "\n" % (self.proc.status(), - hex(id(self))[2:]) - def clear(self): - """Clear-out initial preface or residual subproc input and output.""" - pause = .5; maxIter = 10 # 5 seconds to clear - iterations = 0 - got = '' - self.proc.write('') - while iterations < maxIter: - got = got + self.proc.readPendingChars() - # Strip out all but the last incomplete line: - got = string.splitfields(got, '\n')[-1] - if got == 'ph> ': return # Ok. ===> - time.sleep(pause) - raise SubprocessError('ph not responding within %s secs' % - pause * maxIter) - -############################################################################# -##### Test ##### -############################################################################# - -def test(p=0): - print("\tOpening subprocess:") - p = Subprocess('cat', 1) # set to expire noisily... - print(p) - print("\tOpening bogus subprocess, should fail:") - try: - b = Subprocess('/', 1) - print("\tOops! Null-named subprocess startup *succeeded*?!?") - except SubprocessError: - print("\t...yep, it failed.") - print('\tWrite, then read, two newline-teriminated lines, using readline:') - p.write('first full line written\n'); p.write('second.\n') - print(repr(p.readline())) - print(repr(p.readline())) - print('\tThree lines, last sans newline, read using combination:') - p.write('first\n'); p.write('second\n'); p.write('third, (no cr)') - print('\tFirst line via readline:') - print(repr(p.readline())) - print('\tRest via readPendingChars:') - print(p.readPendingChars()) - print("\tStopping then continuing subprocess (verbose):") - if not p.stop(1): # verbose stop - print('\t** Stop seems to have failed!') - else: - print('\tWriting line while subprocess is paused...') - p.write('written while subprocess paused\n') - print('\tNonblocking read of paused subprocess (should be empty):') - print(p.readPendingChars()) - print('\tContinuing subprocess (verbose):') - if not p.cont(1): # verbose continue - print('\t** Continue seems to have failed! Probly lost subproc...') - return p - else: - print('\tReading accumulated line, blocking read:') - print(p.readline()) - print("\tDeleting subproc, which was set to die noisily:") - del p - print("\tDone.") - return None