1 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
3 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
6 Subprocess class features:
8 - provides non-blocking stdin and stderr reads
10 - provides subprocess stop and continue, kill-on-deletion
12 - provides detection of subprocess startup failure
14 - Subprocess objects have nice, informative string rep (as every good object
17 __version__ = "Revision: 1.15 "
19 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
20 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
22 # Prior art: Initially based python code examples demonstrating usage of pipes
23 # and subprocesses, primarily one by jose pereira.
25 # Implementation notes:
26 # - I'm not using the fcntl module to implement non-blocking file descriptors,
27 # because i don't know what all in it is portable and what is not. I'm not
28 # about to provide for different platform contingencies - at that extent, the
29 # effort would be better spent hacking 'expect' into python.
30 # - Todo? - Incorporate an error-output handler approach, where error output is
31 # checked on regular IO, when a handler is defined, and passed to the
32 # handler (eg for printing) immediately as it shows...
33 # - Detection of failed subprocess startup is a gross kludge, at present.
35 # - new additions (1.3, 1.4):
36 # - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
37 # reads based solely on os.read with select, while capitalizing big-time on
38 # multi-char read chunking.
39 # - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
41 # ken.manheimer@nist.gov
44 import sys, os, string, time, types
49 SubprocessError = 'SubprocessError'
50 # You may need to increase execvp_grace_seconds, if you have a large or slow
52 execvp_grace_seconds = 0.5
55 """Run and communicate asynchronously with a subprocess.
57 Provides non-blocking reads in the form of .readPendingChars and
60 .readline will block until it gets a complete line.
62 .peekPendingChar does a non-blocking, non-consuming read for pending
63 output, and can be used before .readline to check non-destructively for
64 pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until
65 a new character is pending, or timeout secs pass, with granularity of
68 There are corresponding read and peekPendingErrXXX routines, to read from
69 the subprocess stderr stream."""
73 expire_noisily = 1 # Announce subproc destruction?
75 readbuf = 0 # fork will assign to be a readbuf obj
76 errbuf = 0 # fork will assign to be a readbuf obj
78 def __init__(self, cmd, control_stderr=0, expire_noisily=0,
79 in_fd=0, out_fd=1, err_fd=2):
80 """Launch a subprocess, given command string COMMAND."""
83 self.expire_noisily = expire_noisily
84 self.control_stderr = control_stderr
85 self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
88 def fork(self, cmd=None):
89 """Fork a subprocess with designated COMMAND (default, self.cmd)."""
90 if cmd: self.cmd = cmd
92 cmd = string.split(self.cmd)
93 pRc, cWp = os.pipe() # parent-read-child, child-write-parent
94 cRp, pWc = os.pipe() # child-read-parent, parent-write-child
95 pRe, cWe = os.pipe() # parent-read-error, child-write-error
96 self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
100 if self.pid == 0: #### CHILD ####
101 parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
102 # Reopen stdin, out, err, on pipe ends:
103 os.dup2(cRp, self.in_fd) # cRp = sys.stdin
104 os.dup2(cWp, self.out_fd) # cWp = sys.stdout
105 if self.control_stderr:
106 os.dup2(cWe, self.err_fd) # cWe = sys.stderr
107 # Ensure (within reason) stray file descriptors are closed:
108 excludes = [self.in_fd, self.out_fd, self.err_fd]
109 for i in range(4,100):
110 if i not in excludes:
112 except os.error: pass
115 os.execvp(cmd[0], cmd)
116 os._exit(1) # Shouldn't get here
119 if self.control_stderr:
120 os.dup2(parentErr, 2) # Reconnect to parent's stdout
121 sys.stderr.write("**execvp failed, '%s'**\n" %
124 os._exit(1) # Shouldn't get here.
127 # Connect to the child's file descriptors, using our customized
129 self.toChild = os.fdopen(pWc, 'w')
130 self.toChild_fdlist = [pWc]
131 self.readbuf = ReadBuf(pRc)
132 self.errbuf = ReadBuf(pRe)
133 time.sleep(execvp_grace_seconds)
135 pid, err = os.waitpid(self.pid, os.WNOHANG)
136 except os.error, (errno, msg):
138 raise SubprocessError("Subprocess '%s' failed." % self.cmd)
139 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
141 # child exited already
144 rc = (err & 0xff00) >> 8
146 raise SubprocessError(
147 "child killed by signal %d with a return code of %d"
150 raise SubprocessError(
151 "child exited with return code %d" % rc)
152 # Child may have exited, but not in error, so we won't say
153 # anything more at this point.
155 ### Write input to subprocess ###
157 def write(self, str):
158 """Write a STRING to the subprocess."""
161 raise SubprocessError("no child") # ===>
162 if select.select([],self.toChild_fdlist,[],0)[1]:
163 self.toChild.write(str)
166 # XXX Can write-buffer full be handled better??
167 raise IOError("write to %s blocked" % self) # ===>
169 def writeline(self, line=''):
170 """Write STRING, with added newline termination, to subprocess."""
171 self.write(line + '\n')
173 ### Get output from subprocess ###
175 def peekPendingChar(self):
176 """Return, but (effectively) do not consume a single pending output
177 char, or return null string if none pending."""
179 return self.readbuf.peekPendingChar() # ===>
180 def peekPendingErrChar(self):
181 """Return, but (effectively) do not consume a single pending output
182 char, or return null string if none pending."""
184 return self.errbuf.peekPendingChar() # ===>
186 def waitForPendingChar(self, timeout, pollPause=.1):
187 """Block max TIMEOUT secs until we peek a pending char, returning the
188 char, or '' if none encountered.
190 Pause POLLPAUSE secs (default .1) between polls."""
194 nextChar = self.readbuf.peekPendingChar()
195 if nextChar or (accume > timeout): return nextChar
196 time.sleep(pollPause)
197 accume = accume + pollPause
199 def read(self, n=None):
200 """Read N chars, or all pending if no N specified."""
202 return self.readPendingChars()
205 got0 = self.readPendingChars(n)
209 def readPendingChars(self, max=None):
210 """Read all currently pending subprocess output as a single string."""
211 return self.readbuf.readPendingChars(max)
212 def readPendingErrChars(self):
213 """Read all currently pending subprocess error output as a single
215 if self.control_stderr:
216 return self.errbuf.readPendingChars()
218 raise SubprocessError("Haven't grabbed subprocess error stream.")
220 def readPendingLine(self):
221 """Read currently pending subprocess output, up to a complete line
222 (newline inclusive)."""
223 return self.readbuf.readPendingLine()
224 def readPendingErrLine(self):
225 """Read currently pending subprocess error output, up to a complete
226 line (newline inclusive)."""
227 if self.control_stderr:
228 return self.errbuf.readPendingLine()
230 raise SubprocessError("Haven't grabbed subprocess error stream.")
233 """Return next complete line of subprocess output, blocking until
235 return self.readbuf.readline()
236 def readlineErr(self):
237 """Return next complete line of subprocess error output, blocking until
239 if self.control_stderr:
240 return self.errbuf.readline()
242 raise SubprocessError("Haven't grabbed subprocess error stream.")
244 ### Subprocess Control ###
247 """True if subprocess is alive and kicking."""
248 return self.status(boolean=1)
249 def status(self, boolean=0):
250 """Return string indicating whether process is alive or dead."""
253 status = 'sans command'
255 status = 'sans process'
256 elif not self.cont():
257 status = "(unresponding) '%s'" % self.cmd
259 status = "'%s'" % self.cmd
266 def stop(self, verbose=1):
267 """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
270 os.kill(self.pid, signal.SIGSTOP)
273 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
275 if verbose: print("Stopped '%s'" % self.cmd)
278 def cont(self, verbose=0):
279 """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
282 os.kill(self.pid, signal.SIGCONT)
285 print(("Continue failed for '%s' - '%s'" %
286 (self.cmd, sys.exc_value)))
288 if verbose: print("Continued '%s'" % self.cmd)
292 """Send process PID signal SIG (default 9, 'kill'), returning None once
293 it is successfully reaped.
295 SubprocessError is raised if process is not successfully killed."""
298 raise SubprocessError("No process") # ===>
299 elif not self.cont():
300 raise SubprocessError("Can't signal subproc %s" % self) # ===>
302 # Try sending first a TERM and then a KILL signal.
304 sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
307 os.kill(self.pid, sig[1])
311 # Try a couple or three times to reap the process with waitpid:
313 # WNOHANG == 1 on sunos, presumably same elsewhere.
314 if os.waitpid(self.pid, os.WNOHANG):
315 if self.expire_noisily:
316 print(("\n(%s subproc %d '%s' / %s)" %
317 (sig[0], self.pid, self.cmd,
319 for i in self.pipefiles:
324 # Only got here if subprocess is not gone:
325 raise SubprocessError(
326 "Failed kill of subproc %d, '%s', with signals %s" %
327 (self.pid, self.cmd, map(lambda(x): x[0], sigs)))
330 """Terminate the subprocess"""
335 status = self.status()
336 return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
338 #############################################################################
339 ##### Non-blocking read operations #####
340 #############################################################################
343 """Output buffer for non-blocking reads on selectable files like pipes and
344 sockets. Init with a file descriptor for the file."""
346 def __init__(self, fd, maxChunkSize=1024):
347 """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
353 self.eof = 0 # May be set with stuff still in .buf
355 self.chunkSize = maxChunkSize # Biggest read chunk, default 1024.
360 def peekPendingChar(self):
361 """Return, but don't consume, first character of unconsumed output from
362 file, or empty string if none."""
365 return self.buf[0] # ===>
370 sel = select.select([self.fd], [], [self.fd], 0)
374 self.buf = os.read(self.fd, self.chunkSize) # ===>
375 return self.buf[0] # Assume select don't lie.
376 else: return '' # ===>
379 def readPendingChar(self):
380 """Consume first character of unconsumed output from file, or empty
384 got, self.buf = self.buf[0], self.buf[1:]
390 sel = select.select([self.fd], [], [self.fd], 0)
394 return os.read(self.fd, 1) # ===>
395 else: return '' # ===>
397 def readPendingChars(self, max=None):
398 """Consume uncomsumed output from FILE, or empty string if nothing
403 if (max > 0) and (len(self.buf) > max):
404 got = self.buf[0:max]
405 self.buf = self.buf[max:]
407 got, self.buf = self.buf, ''
413 sel = select.select([self.fd], [], [self.fd], 0)
417 got = got + os.read(self.fd, self.chunkSize)
424 self.buf = self.buf + got[max:]
430 def readPendingLine(self, block=0):
431 """Return pending output from FILE, up to first newline (inclusive).
433 Does not block unless optional arg BLOCK is true.
435 Note that an error will be raised if a new eof is encountered without
439 to = string.find(self.buf, '\n')
441 got, self.buf = self.buf[:to+1], self.buf[to+1:]
443 got, self.buf = self.buf, ''
449 # Herein, 'got' contains the (former) contents of the buffer, and it
450 # doesn't include a newline.
452 period = block and 1.0 or 0 # don't be too busy while waiting
453 while 1: # (we'll only loop if block set)
454 sel = select.select(fdlist, [], fdlist, period)
458 got = got + os.read(self.fd, self.chunkSize)
460 to = string.find(got, '\n')
462 got, self.buf = got[:to+1], got[to+1:]
467 self.buf = '' # this is how an ordinary file acts...
469 # otherwise - no newline, blocking requested, no eof - loop. # ==^
472 """Return next output line from file, blocking until it is received."""
474 return self.readPendingLine(1) # ===>
477 #############################################################################
478 ##### Encapsulated reading and writing #####
479 #############################################################################
480 # Encapsulate messages so the end can be unambiguously identified, even
481 # when they contain multiple, possibly empty lines.
484 """Encapsulate stream object for record-oriented IO.
486 Particularly useful when dealing with non-line oriented communications
487 over pipes, eg with subprocesses."""
489 # Message is written preceded by a line containing the message length.
491 def __init__(self, f):
494 def write_record(self, s):
495 "Write so self.read knows exactly how much to read."
496 f = self.__dict__['file']
497 f.write("%s\n%s" % (len(s), s))
498 if hasattr(f, 'flush'):
501 def read_record(self):
502 "Read and reconstruct message as prepared by self.write."
503 f = self.__dict__['file']
504 line = f.readline()[:-1]
507 l = string.atoi(line)
509 raise IOError(("corrupt %s file structure"
510 % self.__class__.__name__))
516 def __getattr__(self, attr):
517 """Implement characteristic file object attributes."""
518 f = self.__dict__['file']
520 return getattr(f, attr)
522 raise AttributeError(attr)
525 return "<%s of %s at %s>" % (self.__class__.__name__,
526 self.__dict__['file'],
530 """Exercise encapsulated write/read with an arbitrary string.
532 Raise IOError if the string gets distorted through transmission!"""
533 from StringIO import StringIO
539 show = " start:\t %s\n end:\t %s\n" % (`s`, `r`)
541 raise IOError("String distorted:\n%s" % show)
543 #############################################################################
544 ##### An example subprocess interfaces #####
545 #############################################################################
548 """Convenient interface to CCSO 'ph' nameserver subprocess.
550 .query('string...') method takes a query and returns a list of dicts, each
551 of which represents one entry."""
553 # Note that i made this a class that handles a subprocess object, rather
554 # than one that inherits from it. I didn't see any functional
555 # disadvantages, and didn't think that full support of the entire
556 # Subprocess functionality was in any way suitable for interaction with
557 # this specialized interface. ? klm 13-Jan-1995
561 self.proc = Subprocess('ph', 1)
563 raise SubprocessError('failure starting ph: %s' % # ===>
567 """Send a query and return a list of dicts for responses.
569 Raise a ValueError if ph responds with an error."""
573 self.proc.writeline('query ' + q)
576 response = self.getreply() # Should get null on new prompt.
577 errs = self.proc.readPendingErrChars()
579 sys.stderr.write(errs)
585 elif type(response) == types.StringType:
586 raise ValueError("ph failed match: '%s'" % response) # ===>
587 for line in response:
589 line = string.splitfields(line, ':')
590 it[string.strip(line[0])] = (
591 string.strip(string.join(line[1:])))
594 """Consume next response from ph, returning list of lines or string
596 # Key on first char: (First line may lack newline.)
597 # - dash discard line
598 # - 'ph> ' conclusion of response
599 # - number error message
600 # - whitespace beginning of next response
602 nextChar = self.proc.waitForPendingChar(60)
604 raise SubprocessError('ph subprocess not responding') # ===>
605 elif nextChar == '-':
606 # dashed line - discard it, and continue reading:
608 return self.getreply() # ===>
609 elif nextChar == 'p':
610 # 'ph> ' prompt - don't think we should hit this, but what the hay:
612 elif nextChar in '0123456789':
613 # Error notice - we're currently assuming single line errors:
614 return self.proc.readline()[:-1] # ===>
615 elif nextChar in ' \t':
616 # Get content, up to next dashed line:
618 while nextChar != '-' and nextChar != '':
619 got.append(self.proc.readline()[:-1])
620 nextChar = self.proc.peekPendingChar()
623 return "<Ph instance, %s at %s>\n" % (self.proc.status(),
626 """Clear-out initial preface or residual subproc input and output."""
627 pause = .5; maxIter = 10 # 5 seconds to clear
631 while iterations < maxIter:
632 got = got + self.proc.readPendingChars()
633 # Strip out all but the last incomplete line:
634 got = string.splitfields(got, '\n')[-1]
635 if got == 'ph> ': return # Ok. ===>
637 raise SubprocessError('ph not responding within %s secs' %
640 #############################################################################
642 #############################################################################
645 print("\tOpening subprocess:")
646 p = Subprocess('cat', 1) # set to expire noisily...
648 print("\tOpening bogus subprocess, should fail:")
650 b = Subprocess('/', 1)
651 print("\tOops! Null-named subprocess startup *succeeded*?!?")
652 except SubprocessError:
653 print("\t...yep, it failed.")
654 print('\tWrite, then read, two newline-teriminated lines, using readline:')
655 p.write('first full line written\n'); p.write('second.\n')
656 print(`p.readline()`)
657 print(`p.readline()`)
658 print('\tThree lines, last sans newline, read using combination:')
659 p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
660 print('\tFirst line via readline:')
661 print(`p.readline()`)
662 print('\tRest via readPendingChars:')
663 print(p.readPendingChars())
664 print("\tStopping then continuing subprocess (verbose):")
665 if not p.stop(1): # verbose stop
666 print('\t** Stop seems to have failed!')
668 print('\tWriting line while subprocess is paused...')
669 p.write('written while subprocess paused\n')
670 print('\tNonblocking read of paused subprocess (should be empty):')
671 print(p.readPendingChars())
672 print('\tContinuing subprocess (verbose):')
673 if not p.cont(1): # verbose continue
674 print('\t** Continue seems to have failed! Probly lost subproc...')
677 print('\tReading accumulated line, blocking read:')
679 print("\tDeleting subproc, which was set to die noisily:")