X-Git-Url: https://git.phdru.name/?p=bookmarks_db.git;a=blobdiff_plain;f=subproc.py;h=e3c2717da8a4207ba0a099e02d9278bebeda7dd0;hp=0280aad6c9fcc10fa0d87c562f44fb6b08eacf3a;hb=HEAD;hpb=c88cb7a75e7caf1d67466cfa107981d95115fa0c diff --git a/subproc.py b/subproc.py old mode 100644 new mode 100755 index 0280aad..37cb253 --- a/subproc.py +++ b/subproc.py @@ -1,3 +1,5 @@ +#! /usr/bin/env python3 + """Run a subprocess and communicate with it via stdin, stdout, and stderr. Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork' @@ -14,7 +16,7 @@ Subprocess class features: - Subprocess objects have nice, informative string rep (as every good object ought).""" -__version__ = "Revision: 1.15 " +__version__ = "Revision: 2.0 " # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995. @@ -40,17 +42,26 @@ __version__ = "Revision: 1.15 " # # ken.manheimer@nist.gov +# This is a modified version by Oleg Broytman . +# The original version is still preserved at +# https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz -import sys, os, string, time, types +import os import select import signal +import sys +import time + + +class SubprocessError(Exception): + pass -SubprocessError = 'SubprocessError' # You may need to increase execvp_grace_seconds, if you have a large or slow # path to search: execvp_grace_seconds = 0.5 + class Subprocess: """Run and communicate asynchronously with a subprocess. @@ -69,7 +80,7 @@ class Subprocess: the subprocess stderr stream.""" pid = 0 - cmd = '' + cmd = b'' expire_noisily = 1 # Announce subproc destruction? pipefiles = [] readbuf = 0 # fork will assign to be a readbuf obj @@ -87,9 +98,9 @@ class Subprocess: def fork(self, cmd=None): """Fork a subprocess with designated COMMAND (default, self.cmd).""" - if cmd: self.cmd = cmd - else: cmd = self.cmd - cmd = string.split(self.cmd) + if cmd: + self.cmd = cmd + cmd = self.cmd.split() pRc, cWp = os.pipe() # parent-read-child, child-write-parent cRp, pWc = os.pipe() # child-read-parent, parent-write-child pRe, cWe = os.pipe() # parent-read-error, child-write-error @@ -97,8 +108,9 @@ class Subprocess: self.pid = os.fork() - if self.pid == 0: #### CHILD #### - parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr + if self.pid == 0: #### CHILD #### noqa: E262 + # Preserve handle on *parent* stderr + parentErr = os.dup(self.in_fd) # Reopen stdin, out, err, on pipe ends: os.dup2(cRp, self.in_fd) # cRp = sys.stdin os.dup2(cWp, self.out_fd) # cWp = sys.stdout @@ -106,10 +118,12 @@ class Subprocess: os.dup2(cWe, self.err_fd) # cWe = sys.stderr # Ensure (within reason) stray file descriptors are closed: excludes = [self.in_fd, self.out_fd, self.err_fd] - for i in range(4,100): + for i in range(4, 100): if i not in excludes: - try: os.close(i) - except os.error: pass + try: + os.close(i) + except os.error: + pass try: os.execvp(cmd[0], cmd) @@ -123,10 +137,10 @@ class Subprocess: os._exit(1) os._exit(1) # Shouldn't get here. - else: ### PARENT ### + else: ### PARENT ### noqa: E262 # Connect to the child's file descriptors, using our customized # fdopen: - self.toChild = os.fdopen(pWc, 'w') + self.toChild = os.fdopen(pWc, 'wb') self.toChild_fdlist = [pWc] self.readbuf = ReadBuf(pRc) self.errbuf = ReadBuf(pRe) @@ -136,48 +150,52 @@ class Subprocess: except os.error as error: errno, msg = error if errno == 10: + self.pid = None raise SubprocessError("Subprocess '%s' failed." % self.cmd) - raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg)) + self.pid = None + raise SubprocessError( + "Subprocess failed[%d]: %s" % (errno, msg)) if pid == self.pid: # child exited already - self.pid == None + self.pid = None sig = err & 0xff rc = (err & 0xff00) >> 8 if sig: raise SubprocessError( - "child killed by signal %d with a return code of %d" - % (sig, rc)) + "child killed by signal %d with a return code of %d" + % (sig, rc)) if rc: raise SubprocessError( "child exited with return code %d" % rc) # Child may have exited, but not in error, so we won't say # anything more at this point. - ### Write input to subprocess ### + ### Write input to subprocess ### noqa: E266 def write(self, str): """Write a STRING to the subprocess.""" if not self.pid: raise SubprocessError("no child") # ===> - if select.select([],self.toChild_fdlist,[],0)[1]: + if select.select([], self.toChild_fdlist, [], 0)[1]: self.toChild.write(str) self.toChild.flush() else: # XXX Can write-buffer full be handled better?? raise IOError("write to %s blocked" % self) # ===> - def writeline(self, line=''): + def writeline(self, line=b''): """Write STRING, with added newline termination, to subprocess.""" - self.write(line + '\n') + self.write(line + b'\n') - ### Get output from subprocess ### + ### Get output from subprocess ### noqa: E266 def peekPendingChar(self): """Return, but (effectively) do not consume a single pending output char, or return null string if none pending.""" return self.readbuf.peekPendingChar() # ===> + def peekPendingErrChar(self): """Return, but (effectively) do not consume a single pending output char, or return null string if none pending.""" @@ -193,23 +211,26 @@ class Subprocess: accume = 0 while 1: nextChar = self.readbuf.peekPendingChar() - if nextChar or (accume > timeout): return nextChar + if nextChar or (accume > timeout): + return nextChar time.sleep(pollPause) accume = accume + pollPause def read(self, n=None): """Read N chars, or all pending if no N specified.""" - if n == None: + if n is None: return self.readPendingChars() - got = '' + got = b'' while n: got0 = self.readPendingChars(n) got = got + got0 n = n - len(got0) return got + def readPendingChars(self, max=None): """Read all currently pending subprocess output as a single string.""" return self.readbuf.readPendingChars(max) + def readPendingErrChars(self): """Read all currently pending subprocess error output as a single string.""" @@ -222,6 +243,7 @@ class Subprocess: """Read currently pending subprocess output, up to a complete line (newline inclusive).""" return self.readbuf.readPendingLine() + def readPendingErrLine(self): """Read currently pending subprocess error output, up to a complete line (newline inclusive).""" @@ -234,6 +256,7 @@ class Subprocess: """Return next complete line of subprocess output, blocking until then.""" return self.readbuf.readline() + def readlineErr(self): """Return next complete line of subprocess error output, blocking until then.""" @@ -242,11 +265,12 @@ class Subprocess: else: raise SubprocessError("Haven't grabbed subprocess error stream.") - ### Subprocess Control ### + ### Subprocess Control ### noqa: E266 def active(self): """True if subprocess is alive and kicking.""" return self.status(boolean=1) + def status(self, boolean=0): """Return string indicating whether process is alive or dead.""" active = 0 @@ -271,9 +295,11 @@ class Subprocess: os.kill(self.pid, signal.SIGSTOP) except os.error: if verbose: - print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value)) + print( + "Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value)) return 0 - if verbose: print("Stopped '%s'" % self.cmd) + if verbose: + print("Stopped '%s'" % self.cmd) return 'stopped' def cont(self, verbose=0): @@ -286,7 +312,8 @@ class Subprocess: print(("Continue failed for '%s' - '%s'" % (self.cmd, sys.exc_value))) return 0 - if verbose: print("Continued '%s'" % self.cmd) + if verbose: + print("Continued '%s'" % self.cmd) return 'continued' def die(self): @@ -301,12 +328,11 @@ class Subprocess: raise SubprocessError("Can't signal subproc %s" % self) # ===> # Try sending first a TERM and then a KILL signal. - keep_trying = 1 sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)] for sig in sigs: try: os.kill(self.pid, sig[1]) - except posix.error: + except OSError: # keep trying pass # Try a couple or three times to reap the process with waitpid: @@ -318,14 +344,18 @@ class Subprocess: (sig[0], self.pid, self.cmd, hex(id(self))[2:]))) for i in self.pipefiles: - os.close(i) + try: + os.fdopen(i).close() + except OSError: + pass + del self.pipefiles[:] self.pid = 0 return None # ===> time.sleep(.1) # Only got here if subprocess is not gone: raise SubprocessError( "Failed kill of subproc %d, '%s', with signals %s" % - (self.pid, self.cmd, map(lambda x: x[0], sigs))) + (self.pid, self.cmd, map(lambda x: x[0], sigs))) def __del__(self): """Terminate the subprocess""" @@ -336,9 +366,11 @@ class Subprocess: status = self.status() return '' -############################################################################# -##### Non-blocking read operations ##### -############################################################################# + +############################################################################## +##### Non-blocking read operations ##### noqa: E266 +############################################################################## + class ReadBuf: """Output buffer for non-blocking reads on selectable files like pipes and @@ -352,7 +384,7 @@ class ReadBuf: raise ValueError self.fd = fd self.eof = 0 # May be set with stuff still in .buf - self.buf = '' + self.buf = b'' self.chunkSize = maxChunkSize # Biggest read chunk, default 1024. def fileno(self): @@ -366,7 +398,7 @@ class ReadBuf: return self.buf[0] # ===> if self.eof: - return '' # ===> + return b'' # ===> sel = select.select([self.fd], [], [self.fd], 0) if sel[2]: @@ -374,8 +406,8 @@ class ReadBuf: if sel[0]: self.buf = os.read(self.fd, self.chunkSize) # ===> return self.buf[0] # Assume select don't lie. - else: return '' # ===> - + else: + return b'' # ===> def readPendingChar(self): """Consume first character of unconsumed output from file, or empty @@ -386,30 +418,31 @@ class ReadBuf: return got # ===> if self.eof: - return '' # ===> + return b'' # ===> sel = select.select([self.fd], [], [self.fd], 0) if sel[2]: self.eof = 1 if sel[0]: return os.read(self.fd, 1) # ===> - else: return '' # ===> + else: + return b'' # ===> def readPendingChars(self, max=None): """Consume uncomsumed output from FILE, or empty string if nothing pending.""" - got = "" + got = b"" if self.buf: - if (max > 0) and (len(self.buf) > max): + if max and (len(self.buf) > max): got = self.buf[0:max] self.buf = self.buf[max:] else: - got, self.buf = self.buf, '' + got, self.buf = self.buf, b'' return got if self.eof: - return '' + return b'' sel = select.select([self.fd], [], [self.fd], 0) if sel[2]: @@ -418,15 +451,16 @@ class ReadBuf: got = got + os.read(self.fd, self.chunkSize) if max == 0: self.buf = got - return '' - elif max == None: + return b'' + elif max is None: return got elif len(got) > max: self.buf = self.buf + got[max:] return got[:max] else: return got - else: return '' + else: + return b'' def readPendingLine(self, block=0): """Return pending output from FILE, up to first newline (inclusive). @@ -437,15 +471,15 @@ class ReadBuf: any newline.""" if self.buf: - to = string.find(self.buf, '\n') + to = self.buf.find(b'\n') if to != -1: got, self.buf = self.buf[:to+1], self.buf[to+1:] return got # ===> - got, self.buf = self.buf, '' + got, self.buf = self.buf, b'' else: if self.eof: - return '' # ===> - got = '' + return b'' # ===> + got = b'' # Herein, 'got' contains the (former) contents of the buffer, and it # doesn't include a newline. @@ -458,14 +492,14 @@ class ReadBuf: if sel[0]: got = got + os.read(self.fd, self.chunkSize) - to = string.find(got, '\n') + to = got.find(b'\n') if to != -1: got, self.buf = got[:to+1], got[to+1:] return got # ===> if not block: return got # ===> if self.eof: - self.buf = '' # this is how an ordinary file acts... + self.buf = b'' # this is how an ordinary file acts... return got # otherwise - no newline, blocking requested, no eof - loop. # ==^ @@ -476,11 +510,12 @@ class ReadBuf: ############################################################################# -##### Encapsulated reading and writing ##### +##### Encapsulated reading and writing ##### noqa: E266 ############################################################################# # Encapsulate messages so the end can be unambiguously identified, even # when they contain multiple, possibly empty lines. + class RecordFile: """Encapsulate stream object for record-oriented IO. @@ -495,7 +530,7 @@ class RecordFile: def write_record(self, s): "Write so self.read knows exactly how much to read." f = self.__dict__['file'] - f.write("%s\n%s" % (len(s), s)) + f.write(b"%d\n%s" % (len(s), s)) if hasattr(f, 'flush'): f.flush() @@ -505,14 +540,14 @@ class RecordFile: line = f.readline()[:-1] if line: try: - l = string.atoi(line) + _l = int(line) except ValueError: raise IOError(("corrupt %s file structure" - % self.__class__.__name__)) - return f.read(l) + % self.__class__.__name__)) + return f.read(_l) else: # EOF. - return '' + return b'' def __getattr__(self, attr): """Implement characteristic file object attributes.""" @@ -527,6 +562,7 @@ class RecordFile: self.__dict__['file'], hex(id(self))[2:]) + def record_trial(s): """Exercise encapsulated write/read with an arbitrary string. @@ -541,10 +577,12 @@ def record_trial(s): if r != s: raise IOError("String distorted:\n%s" % show) + ############################################################################# -##### An example subprocess interfaces ##### +##### An example subprocess interfaces ##### noqa: E266 ############################################################################# + class Ph: """Convenient interface to CCSO 'ph' nameserver subprocess. @@ -562,7 +600,7 @@ class Ph: self.proc = Subprocess('ph', 1) except: raise SubprocessError('failure starting ph: %s' % # ===> - str(sys.exc_value)) + str(sys.exc_value)) def query(self, q): """Send a query and return a list of dicts for responses. @@ -571,8 +609,9 @@ class Ph: self.clear() - self.proc.writeline('query ' + q) - got = []; it = {} + self.proc.writeline(b'query ' + q) + got = [] + it = {} while 1: response = self.getreply() # Should get null on new prompt. errs = self.proc.readPendingErrChars() @@ -583,13 +622,12 @@ class Ph: it = {} if not response: return got # ===> - elif type(response) == types.StringType: + elif isinstance(response, str): raise ValueError("ph failed match: '%s'" % response) # ===> for line in response: # convert to a dict: - line = string.splitfields(line, ':') - it[string.strip(line[0])] = ( - string.strip(string.join(line[1:]))) + line = line.split(b':') + it[line.strip([0])] = (b''.join(line[1:])).strip() def getreply(self): """Consume next response from ph, returning list of lines or string @@ -603,61 +641,70 @@ class Ph: nextChar = self.proc.waitForPendingChar(60) if not nextChar: raise SubprocessError('ph subprocess not responding') # ===> - elif nextChar == '-': + elif nextChar == b'-': # dashed line - discard it, and continue reading: self.proc.readline() return self.getreply() # ===> - elif nextChar == 'p': + elif nextChar == b'p': # 'ph> ' prompt - don't think we should hit this, but what the hay: return '' # ===> - elif nextChar in '0123456789': + elif nextChar in b'0123456789': # Error notice - we're currently assuming single line errors: return self.proc.readline()[:-1] # ===> - elif nextChar in ' \t': + elif nextChar in b' \t': # Get content, up to next dashed line: got = [] - while nextChar != '-' and nextChar != '': + while nextChar != b'-' and nextChar != b'': got.append(self.proc.readline()[:-1]) nextChar = self.proc.peekPendingChar() return got + def __repr__(self): return "\n" % (self.proc.status(), - hex(id(self))[2:]) + hex(id(self))[2:]) + def clear(self): """Clear-out initial preface or residual subproc input and output.""" - pause = .5; maxIter = 10 # 5 seconds to clear + pause = .5 + maxIter = 10 # 5 seconds to clear iterations = 0 - got = '' - self.proc.write('') + got = b'' + self.proc.write(b'') while iterations < maxIter: got = got + self.proc.readPendingChars() # Strip out all but the last incomplete line: - got = string.splitfields(got, '\n')[-1] - if got == 'ph> ': return # Ok. ===> + got = got.split(b'\n')[-1] + if got == b'ph> ': + return # Ok. ===> time.sleep(pause) raise SubprocessError('ph not responding within %s secs' % - pause * maxIter) + pause * maxIter) + + +############################################################################## +##### Test ##### noqa: E266 +############################################################################## -############################################################################# -##### Test ##### -############################################################################# def test(p=0): - print("\tOpening subprocess:") - p = Subprocess('cat', 1) # set to expire noisily... - print(p) print("\tOpening bogus subprocess, should fail:") try: - b = Subprocess('/', 1) + Subprocess('/', 1) print("\tOops! Null-named subprocess startup *succeeded*?!?") except SubprocessError: print("\t...yep, it failed.") + print("\tOpening cat subprocess:") + p = Subprocess('cat', 1) # set to expire noisily... + print(p) print('\tWrite, then read, two newline-teriminated lines, using readline:') - p.write('first full line written\n'); p.write('second.\n') + p.write(b'first full line written\n') + p.write(b'second.\n') print(repr(p.readline())) print(repr(p.readline())) print('\tThree lines, last sans newline, read using combination:') - p.write('first\n'); p.write('second\n'); p.write('third, (no cr)') + p.write(b'first\n') + p.write(b'second\n') + p.write(b'third, (no cr)') print('\tFirst line via readline:') print(repr(p.readline())) print('\tRest via readPendingChars:') @@ -667,12 +714,13 @@ def test(p=0): print('\t** Stop seems to have failed!') else: print('\tWriting line while subprocess is paused...') - p.write('written while subprocess paused\n') + p.write(b'written while subprocess paused\n') print('\tNonblocking read of paused subprocess (should be empty):') print(p.readPendingChars()) print('\tContinuing subprocess (verbose):') if not p.cont(1): # verbose continue - print('\t** Continue seems to have failed! Probly lost subproc...') + print( + '\t** Continue seems to have failed! Probly lost subproc...') return p else: print('\tReading accumulated line, blocking read:') @@ -681,3 +729,7 @@ def test(p=0): del p print("\tDone.") return None + + +if __name__ == '__main__': + test()