+#! /usr/bin/env python
+
"""Run a subprocess and communicate with it via stdin, stdout, and stderr.
Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
- Subprocess objects have nice, informative string rep (as every good object
ought)."""
-__version__ = "Revision: 1.15 "
+__version__ = "Revision: 2.0 "
-# Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
+# Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
# Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
# Prior art: Initially based python code examples demonstrating usage of pipes
#
# ken.manheimer@nist.gov
+# This is a modified version by Oleg Broytman <phd@phdru.name>.
+# The original version is still preserved at
+# https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
-import sys, os, string, time, types
+import os
import select
import signal
+import sys
+import time
+
+
+class SubprocessError(Exception):
+ pass
-SubprocessError = 'SubprocessError'
# You may need to increase execvp_grace_seconds, if you have a large or slow
# path to search:
execvp_grace_seconds = 0.5
+
class Subprocess:
"""Run and communicate asynchronously with a subprocess.
def fork(self, cmd=None):
"""Fork a subprocess with designated COMMAND (default, self.cmd)."""
- if cmd: self.cmd = cmd
- else: cmd = self.cmd
- cmd = string.split(self.cmd)
+ if cmd:
+ self.cmd = cmd
+ cmd = self.cmd.split()
pRc, cWp = os.pipe() # parent-read-child, child-write-parent
cRp, pWc = os.pipe() # child-read-parent, parent-write-child
pRe, cWe = os.pipe() # parent-read-error, child-write-error
self.pid = os.fork()
- if self.pid == 0: #### CHILD ####
- parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
+ if self.pid == 0: #### CHILD #### noqa: E262
+ # Preserve handle on *parent* stderr
+ parentErr = os.dup(self.in_fd)
# Reopen stdin, out, err, on pipe ends:
os.dup2(cRp, self.in_fd) # cRp = sys.stdin
os.dup2(cWp, self.out_fd) # cWp = sys.stdout
os.dup2(cWe, self.err_fd) # cWe = sys.stderr
# Ensure (within reason) stray file descriptors are closed:
excludes = [self.in_fd, self.out_fd, self.err_fd]
- for i in range(4,100):
+ for i in range(4, 100):
if i not in excludes:
- try: os.close(i)
- except os.error: pass
+ try:
+ os.close(i)
+ except os.error:
+ pass
try:
os.execvp(cmd[0], cmd)
os._exit(1) # Shouldn't get here
- except os.error, e:
+ except os.error as e:
if self.control_stderr:
os.dup2(parentErr, 2) # Reconnect to parent's stdout
sys.stderr.write("**execvp failed, '%s'**\n" %
os._exit(1)
os._exit(1) # Shouldn't get here.
- else: ### PARENT ###
+ else: ### PARENT ### noqa: E262
# Connect to the child's file descriptors, using our customized
# fdopen:
self.toChild = os.fdopen(pWc, 'w')
time.sleep(execvp_grace_seconds)
try:
pid, err = os.waitpid(self.pid, os.WNOHANG)
- except os.error, (errno, msg):
+ except os.error as error:
+ errno, msg = error
if errno == 10:
- raise SubprocessError, \
- "Subprocess '%s' failed." % self.cmd
- raise SubprocessError, \
- "Subprocess failed[%d]: %s" % (errno, msg)
+ self.pid = None
+ raise SubprocessError("Subprocess '%s' failed." % self.cmd)
+ self.pid = None
+ raise SubprocessError(
+ "Subprocess failed[%d]: %s" % (errno, msg))
if pid == self.pid:
# child exited already
- self.pid == None
+ self.pid = None
sig = err & 0xff
rc = (err & 0xff00) >> 8
if sig:
- raise SubprocessError, (
- "child killed by signal %d with a return code of %d"
- % (sig, rc))
+ raise SubprocessError(
+ "child killed by signal %d with a return code of %d"
+ % (sig, rc))
if rc:
- raise SubprocessError, \
- "child exited with return code %d" % rc
+ raise SubprocessError(
+ "child exited with return code %d" % rc)
# Child may have exited, but not in error, so we won't say
# anything more at this point.
- ### Write input to subprocess ###
+ ### Write input to subprocess ### noqa: E266
def write(self, str):
"""Write a STRING to the subprocess."""
if not self.pid:
- raise SubprocessError, "no child" # ===>
- if select.select([],self.toChild_fdlist,[],0)[1]:
+ raise SubprocessError("no child") # ===>
+ if select.select([], self.toChild_fdlist, [], 0)[1]:
self.toChild.write(str)
self.toChild.flush()
else:
# XXX Can write-buffer full be handled better??
- raise IOError, "write to %s blocked" % self # ===>
+ raise IOError("write to %s blocked" % self) # ===>
def writeline(self, line=''):
"""Write STRING, with added newline termination, to subprocess."""
self.write(line + '\n')
- ### Get output from subprocess ###
+ ### Get output from subprocess ### noqa: E266
def peekPendingChar(self):
"""Return, but (effectively) do not consume a single pending output
char, or return null string if none pending."""
return self.readbuf.peekPendingChar() # ===>
+
def peekPendingErrChar(self):
"""Return, but (effectively) do not consume a single pending output
char, or return null string if none pending."""
accume = 0
while 1:
nextChar = self.readbuf.peekPendingChar()
- if nextChar or (accume > timeout): return nextChar
+ if nextChar or (accume > timeout):
+ return nextChar
time.sleep(pollPause)
accume = accume + pollPause
def read(self, n=None):
"""Read N chars, or all pending if no N specified."""
- if n == None:
+ if n is None:
return self.readPendingChars()
got = ''
while n:
got0 = self.readPendingChars(n)
got = got + got0
n = n - len(got0)
- return got
+ return got
+
def readPendingChars(self, max=None):
"""Read all currently pending subprocess output as a single string."""
return self.readbuf.readPendingChars(max)
+
def readPendingErrChars(self):
"""Read all currently pending subprocess error output as a single
string."""
if self.control_stderr:
return self.errbuf.readPendingChars()
else:
- raise SubprocessError, "Haven't grabbed subprocess error stream."
+ raise SubprocessError("Haven't grabbed subprocess error stream.")
def readPendingLine(self):
"""Read currently pending subprocess output, up to a complete line
(newline inclusive)."""
return self.readbuf.readPendingLine()
+
def readPendingErrLine(self):
"""Read currently pending subprocess error output, up to a complete
line (newline inclusive)."""
if self.control_stderr:
return self.errbuf.readPendingLine()
else:
- raise SubprocessError, "Haven't grabbed subprocess error stream."
+ raise SubprocessError("Haven't grabbed subprocess error stream.")
def readline(self):
"""Return next complete line of subprocess output, blocking until
then."""
return self.readbuf.readline()
+
def readlineErr(self):
"""Return next complete line of subprocess error output, blocking until
then."""
if self.control_stderr:
return self.errbuf.readline()
else:
- raise SubprocessError, "Haven't grabbed subprocess error stream."
+ raise SubprocessError("Haven't grabbed subprocess error stream.")
- ### Subprocess Control ###
+ ### Subprocess Control ### noqa: E266
def active(self):
"""True if subprocess is alive and kicking."""
return self.status(boolean=1)
+
def status(self, boolean=0):
"""Return string indicating whether process is alive or dead."""
active = 0
os.kill(self.pid, signal.SIGSTOP)
except os.error:
if verbose:
- print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
+ print(
+ "Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
return 0
- if verbose: print("Stopped '%s'" % self.cmd)
+ if verbose:
+ print("Stopped '%s'" % self.cmd)
return 'stopped'
def cont(self, verbose=0):
print(("Continue failed for '%s' - '%s'" %
(self.cmd, sys.exc_value)))
return 0
- if verbose: print("Continued '%s'" % self.cmd)
+ if verbose:
+ print("Continued '%s'" % self.cmd)
return 'continued'
def die(self):
SubprocessError is raised if process is not successfully killed."""
if not self.pid:
- raise SubprocessError, "No process" # ===>
+ raise SubprocessError("No process") # ===>
elif not self.cont():
- raise SubprocessError, "Can't signal subproc %s" % self # ===>
+ raise SubprocessError("Can't signal subproc %s" % self) # ===>
# Try sending first a TERM and then a KILL signal.
- keep_trying = 1
sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
for sig in sigs:
try:
os.kill(self.pid, sig[1])
- except posix.error:
+ except OSError:
# keep trying
pass
# Try a couple or three times to reap the process with waitpid:
(sig[0], self.pid, self.cmd,
hex(id(self))[2:])))
for i in self.pipefiles:
- os.close(i)
+ try:
+ os.fdopen(i).close()
+ except OSError:
+ pass
+ del self.pipefiles[:]
self.pid = 0
return None # ===>
time.sleep(.1)
# Only got here if subprocess is not gone:
- raise (SubprocessError,
- ("Failed kill of subproc %d, '%s', with signals %s" %
- (self.pid, self.cmd, map(lambda(x): x[0], sigs))))
+ raise SubprocessError(
+ "Failed kill of subproc %d, '%s', with signals %s" %
+ (self.pid, self.cmd, map(lambda x: x[0], sigs)))
def __del__(self):
"""Terminate the subprocess"""
status = self.status()
return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
-#############################################################################
-##### Non-blocking read operations #####
-#############################################################################
+
+##############################################################################
+##### Non-blocking read operations ##### noqa: E266
+##############################################################################
+
class ReadBuf:
"""Output buffer for non-blocking reads on selectable files like pipes and
if sel[0]:
self.buf = os.read(self.fd, self.chunkSize) # ===>
return self.buf[0] # Assume select don't lie.
- else: return '' # ===>
-
+ else:
+ return '' # ===>
def readPendingChar(self):
"""Consume first character of unconsumed output from file, or empty
self.eof = 1
if sel[0]:
return os.read(self.fd, 1) # ===>
- else: return '' # ===>
+ else:
+ return '' # ===>
def readPendingChars(self, max=None):
"""Consume uncomsumed output from FILE, or empty string if nothing
got = ""
if self.buf:
- if (max > 0) and (len(self.buf) > max):
- got = self.buf[0:max]
- self.buf = self.buf[max:]
- else:
- got, self.buf = self.buf, ''
- return got
+ if (max > 0) and (len(self.buf) > max):
+ got = self.buf[0:max]
+ self.buf = self.buf[max:]
+ else:
+ got, self.buf = self.buf, ''
+ return got
if self.eof:
- return ''
+ return ''
sel = select.select([self.fd], [], [self.fd], 0)
if sel[2]:
if max == 0:
self.buf = got
return ''
- elif max == None:
+ elif max is None:
return got
elif len(got) > max:
self.buf = self.buf + got[max:]
return got[:max]
else:
return got
- else: return ''
+ else:
+ return ''
def readPendingLine(self, block=0):
"""Return pending output from FILE, up to first newline (inclusive).
any newline."""
if self.buf:
- to = string.find(self.buf, '\n')
+ to = self.buf.find('\n')
if to != -1:
got, self.buf = self.buf[:to+1], self.buf[to+1:]
return got # ===>
if sel[0]:
got = got + os.read(self.fd, self.chunkSize)
- to = string.find(got, '\n')
+ to = got.find('\n')
if to != -1:
got, self.buf = got[:to+1], got[to+1:]
return got # ===>
#############################################################################
-##### Encapsulated reading and writing #####
+##### Encapsulated reading and writing ##### noqa: E266
#############################################################################
# Encapsulate messages so the end can be unambiguously identified, even
# when they contain multiple, possibly empty lines.
+
class RecordFile:
"""Encapsulate stream object for record-oriented IO.
line = f.readline()[:-1]
if line:
try:
- l = string.atoi(line)
+ l = int(line)
except ValueError:
- raise IOError, ("corrupt %s file structure"
- % self.__class__.__name__)
+ raise IOError(("corrupt %s file structure"
+ % self.__class__.__name__))
return f.read(l)
else:
# EOF.
if hasattr(f, attr):
return getattr(f, attr)
else:
- raise AttributeError, attr
+ raise AttributeError(attr)
def __repr__(self):
return "<%s of %s at %s>" % (self.__class__.__name__,
self.__dict__['file'],
hex(id(self))[2:])
+
def record_trial(s):
"""Exercise encapsulated write/read with an arbitrary string.
c.write(s)
c.seek(0)
r = c.read()
- show = " start:\t %s\n end:\t %s\n" % (`s`, `r`)
+ show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
if r != s:
- raise IOError, "String distorted:\n%s" % show
+ raise IOError("String distorted:\n%s" % show)
+
#############################################################################
-##### An example subprocess interfaces #####
+##### An example subprocess interfaces ##### noqa: E266
#############################################################################
+
class Ph:
"""Convenient interface to CCSO 'ph' nameserver subprocess.
try:
self.proc = Subprocess('ph', 1)
except:
- raise SubprocessError, ('failure starting ph: %s' % # ===>
- str(sys.exc_value))
+ raise SubprocessError('failure starting ph: %s' % # ===>
+ str(sys.exc_value))
def query(self, q):
"""Send a query and return a list of dicts for responses.
self.clear()
self.proc.writeline('query ' + q)
- got = []; it = {}
+ got = []
+ it = {}
while 1:
response = self.getreply() # Should get null on new prompt.
errs = self.proc.readPendingErrChars()
it = {}
if not response:
return got # ===>
- elif type(response) == types.StringType:
- raise ValueError, "ph failed match: '%s'" % response # ===>
+ elif isinstance(response, str):
+ raise ValueError("ph failed match: '%s'" % response) # ===>
for line in response:
# convert to a dict:
- line = string.splitfields(line, ':')
- it[string.strip(line[0])] = (
- string.strip(string.join(line[1:])))
-
+ line = line.split(':')
+ it[line.strip([0])] = (''.join(line[1:])).strip()
+
def getreply(self):
"""Consume next response from ph, returning list of lines or string
err."""
nextChar = self.proc.waitForPendingChar(60)
if not nextChar:
- raise SubprocessError, 'ph subprocess not responding' # ===>
+ raise SubprocessError('ph subprocess not responding') # ===>
elif nextChar == '-':
# dashed line - discard it, and continue reading:
self.proc.readline()
got.append(self.proc.readline()[:-1])
nextChar = self.proc.peekPendingChar()
return got
+
def __repr__(self):
return "<Ph instance, %s at %s>\n" % (self.proc.status(),
- hex(id(self))[2:])
+ hex(id(self))[2:])
+
def clear(self):
"""Clear-out initial preface or residual subproc input and output."""
- pause = .5; maxIter = 10 # 5 seconds to clear
+ pause = .5
+ maxIter = 10 # 5 seconds to clear
iterations = 0
got = ''
self.proc.write('')
while iterations < maxIter:
got = got + self.proc.readPendingChars()
# Strip out all but the last incomplete line:
- got = string.splitfields(got, '\n')[-1]
- if got == 'ph> ': return # Ok. ===>
+ got = got.split('\n')[-1]
+ if got == 'ph> ':
+ return # Ok. ===>
time.sleep(pause)
- raise SubprocessError, ('ph not responding within %s secs' %
- pause * maxIter)
+ raise SubprocessError('ph not responding within %s secs' %
+ pause * maxIter)
+
+
+##############################################################################
+##### Test ##### noqa: E266
+##############################################################################
-#############################################################################
-##### Test #####
-#############################################################################
def test(p=0):
- print("\tOpening subprocess:")
- p = Subprocess('cat', 1) # set to expire noisily...
- print(p)
print("\tOpening bogus subprocess, should fail:")
try:
- b = Subprocess('/', 1)
+ Subprocess('/', 1)
print("\tOops! Null-named subprocess startup *succeeded*?!?")
except SubprocessError:
print("\t...yep, it failed.")
+ print("\tOpening cat subprocess:")
+ p = Subprocess('cat', 1) # set to expire noisily...
+ print(p)
print('\tWrite, then read, two newline-teriminated lines, using readline:')
- p.write('first full line written\n'); p.write('second.\n')
- print(`p.readline()`)
- print(`p.readline()`)
+ p.write('first full line written\n')
+ p.write('second.\n')
+ print(repr(p.readline()))
+ print(repr(p.readline()))
print('\tThree lines, last sans newline, read using combination:')
- p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
+ p.write('first\n')
+ p.write('second\n')
+ p.write('third, (no cr)')
print('\tFirst line via readline:')
- print(`p.readline()`)
+ print(repr(p.readline()))
print('\tRest via readPendingChars:')
print(p.readPendingChars())
print("\tStopping then continuing subprocess (verbose):")
print(p.readPendingChars())
print('\tContinuing subprocess (verbose):')
if not p.cont(1): # verbose continue
- print('\t** Continue seems to have failed! Probly lost subproc...')
+ print(
+ '\t** Continue seems to have failed! Probly lost subproc...')
return p
else:
print('\tReading accumulated line, blocking read:')
del p
print("\tDone.")
return None
+
+
+if __name__ == '__main__':
+ test()