3 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
5 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
8 Subprocess class features:
10 - provides non-blocking stdin and stderr reads
12 - provides subprocess stop and continue, kill-on-deletion
14 - provides detection of subprocess startup failure
16 - Subprocess objects have nice, informative string rep (as every good object
19 __version__ = "Revision: 1.15 "
21 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
22 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
24 # Prior art: Initially based python code examples demonstrating usage of pipes
25 # and subprocesses, primarily one by jose pereira.
27 # Implementation notes:
28 # - I'm not using the fcntl module to implement non-blocking file descriptors,
29 # because i don't know what all in it is portable and what is not. I'm not
30 # about to provide for different platform contingencies - at that extent, the
31 # effort would be better spent hacking 'expect' into python.
32 # - Todo? - Incorporate an error-output handler approach, where error output is
33 # checked on regular IO, when a handler is defined, and passed to the
34 # handler (eg for printing) immediately as it shows...
35 # - Detection of failed subprocess startup is a gross kludge, at present.
37 # - new additions (1.3, 1.4):
38 # - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
39 # reads based solely on os.read with select, while capitalizing big-time on
40 # multi-char read chunking.
41 # - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
43 # ken.manheimer@nist.gov
45 # This is a modified version by Oleg Broytman <phd@phdru.name>.
46 # The original version is still preserved at
47 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
49 import sys, os, string, time, types
54 class SubprocessError(Exception):
57 # You may need to increase execvp_grace_seconds, if you have a large or slow
59 execvp_grace_seconds = 0.5
62 """Run and communicate asynchronously with a subprocess.
64 Provides non-blocking reads in the form of .readPendingChars and
67 .readline will block until it gets a complete line.
69 .peekPendingChar does a non-blocking, non-consuming read for pending
70 output, and can be used before .readline to check non-destructively for
71 pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until
72 a new character is pending, or timeout secs pass, with granularity of
75 There are corresponding read and peekPendingErrXXX routines, to read from
76 the subprocess stderr stream."""
80 expire_noisily = 1 # Announce subproc destruction?
82 readbuf = 0 # fork will assign to be a readbuf obj
83 errbuf = 0 # fork will assign to be a readbuf obj
85 def __init__(self, cmd, control_stderr=0, expire_noisily=0,
86 in_fd=0, out_fd=1, err_fd=2):
87 """Launch a subprocess, given command string COMMAND."""
90 self.expire_noisily = expire_noisily
91 self.control_stderr = control_stderr
92 self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
95 def fork(self, cmd=None):
96 """Fork a subprocess with designated COMMAND (default, self.cmd)."""
97 if cmd: self.cmd = cmd
99 cmd = string.split(self.cmd)
100 pRc, cWp = os.pipe() # parent-read-child, child-write-parent
101 cRp, pWc = os.pipe() # child-read-parent, parent-write-child
102 pRe, cWe = os.pipe() # parent-read-error, child-write-error
103 self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
107 if self.pid == 0: #### CHILD ####
108 parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
109 # Reopen stdin, out, err, on pipe ends:
110 os.dup2(cRp, self.in_fd) # cRp = sys.stdin
111 os.dup2(cWp, self.out_fd) # cWp = sys.stdout
112 if self.control_stderr:
113 os.dup2(cWe, self.err_fd) # cWe = sys.stderr
114 # Ensure (within reason) stray file descriptors are closed:
115 excludes = [self.in_fd, self.out_fd, self.err_fd]
116 for i in range(4,100):
117 if i not in excludes:
119 except os.error: pass
122 os.execvp(cmd[0], cmd)
123 os._exit(1) # Shouldn't get here
125 except os.error as e:
126 if self.control_stderr:
127 os.dup2(parentErr, 2) # Reconnect to parent's stdout
128 sys.stderr.write("**execvp failed, '%s'**\n" %
131 os._exit(1) # Shouldn't get here.
134 # Connect to the child's file descriptors, using our customized
136 self.toChild = os.fdopen(pWc, 'w')
137 self.toChild_fdlist = [pWc]
138 self.readbuf = ReadBuf(pRc)
139 self.errbuf = ReadBuf(pRe)
140 time.sleep(execvp_grace_seconds)
142 pid, err = os.waitpid(self.pid, os.WNOHANG)
143 except os.error as error:
147 raise SubprocessError("Subprocess '%s' failed." % self.cmd)
149 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
151 # child exited already
154 rc = (err & 0xff00) >> 8
156 raise SubprocessError(
157 "child killed by signal %d with a return code of %d"
161 raise SubprocessError(
162 "child exited with return code %d" % rc)
163 # Child may have exited, but not in error, so we won't say
164 # anything more at this point.
166 ### Write input to subprocess ###
168 def write(self, str):
169 """Write a STRING to the subprocess."""
172 raise SubprocessError("no child") # ===>
173 if select.select([],self.toChild_fdlist,[],0)[1]:
174 self.toChild.write(str)
177 # XXX Can write-buffer full be handled better??
178 raise IOError("write to %s blocked" % self) # ===>
180 def writeline(self, line=''):
181 """Write STRING, with added newline termination, to subprocess."""
182 self.write(line + '\n')
184 ### Get output from subprocess ###
186 def peekPendingChar(self):
187 """Return, but (effectively) do not consume a single pending output
188 char, or return null string if none pending."""
190 return self.readbuf.peekPendingChar() # ===>
191 def peekPendingErrChar(self):
192 """Return, but (effectively) do not consume a single pending output
193 char, or return null string if none pending."""
195 return self.errbuf.peekPendingChar() # ===>
197 def waitForPendingChar(self, timeout, pollPause=.1):
198 """Block max TIMEOUT secs until we peek a pending char, returning the
199 char, or '' if none encountered.
201 Pause POLLPAUSE secs (default .1) between polls."""
205 nextChar = self.readbuf.peekPendingChar()
206 if nextChar or (accume > timeout): return nextChar
207 time.sleep(pollPause)
208 accume = accume + pollPause
210 def read(self, n=None):
211 """Read N chars, or all pending if no N specified."""
213 return self.readPendingChars()
216 got0 = self.readPendingChars(n)
220 def readPendingChars(self, max=None):
221 """Read all currently pending subprocess output as a single string."""
222 return self.readbuf.readPendingChars(max)
223 def readPendingErrChars(self):
224 """Read all currently pending subprocess error output as a single
226 if self.control_stderr:
227 return self.errbuf.readPendingChars()
229 raise SubprocessError("Haven't grabbed subprocess error stream.")
231 def readPendingLine(self):
232 """Read currently pending subprocess output, up to a complete line
233 (newline inclusive)."""
234 return self.readbuf.readPendingLine()
235 def readPendingErrLine(self):
236 """Read currently pending subprocess error output, up to a complete
237 line (newline inclusive)."""
238 if self.control_stderr:
239 return self.errbuf.readPendingLine()
241 raise SubprocessError("Haven't grabbed subprocess error stream.")
244 """Return next complete line of subprocess output, blocking until
246 return self.readbuf.readline()
247 def readlineErr(self):
248 """Return next complete line of subprocess error output, blocking until
250 if self.control_stderr:
251 return self.errbuf.readline()
253 raise SubprocessError("Haven't grabbed subprocess error stream.")
255 ### Subprocess Control ###
258 """True if subprocess is alive and kicking."""
259 return self.status(boolean=1)
260 def status(self, boolean=0):
261 """Return string indicating whether process is alive or dead."""
264 status = 'sans command'
266 status = 'sans process'
267 elif not self.cont():
268 status = "(unresponding) '%s'" % self.cmd
270 status = "'%s'" % self.cmd
277 def stop(self, verbose=1):
278 """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
281 os.kill(self.pid, signal.SIGSTOP)
284 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
286 if verbose: print("Stopped '%s'" % self.cmd)
289 def cont(self, verbose=0):
290 """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
293 os.kill(self.pid, signal.SIGCONT)
296 print(("Continue failed for '%s' - '%s'" %
297 (self.cmd, sys.exc_value)))
299 if verbose: print("Continued '%s'" % self.cmd)
303 """Send process PID signal SIG (default 9, 'kill'), returning None once
304 it is successfully reaped.
306 SubprocessError is raised if process is not successfully killed."""
309 raise SubprocessError("No process") # ===>
310 elif not self.cont():
311 raise SubprocessError("Can't signal subproc %s" % self) # ===>
313 # Try sending first a TERM and then a KILL signal.
315 sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
318 os.kill(self.pid, sig[1])
322 # Try a couple or three times to reap the process with waitpid:
324 # WNOHANG == 1 on sunos, presumably same elsewhere.
325 if os.waitpid(self.pid, os.WNOHANG):
326 if self.expire_noisily:
327 print(("\n(%s subproc %d '%s' / %s)" %
328 (sig[0], self.pid, self.cmd,
330 for i in self.pipefiles:
335 # Only got here if subprocess is not gone:
336 raise SubprocessError(
337 "Failed kill of subproc %d, '%s', with signals %s" %
338 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
341 """Terminate the subprocess"""
346 status = self.status()
347 return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
349 #############################################################################
350 ##### Non-blocking read operations #####
351 #############################################################################
354 """Output buffer for non-blocking reads on selectable files like pipes and
355 sockets. Init with a file descriptor for the file."""
357 def __init__(self, fd, maxChunkSize=1024):
358 """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
364 self.eof = 0 # May be set with stuff still in .buf
366 self.chunkSize = maxChunkSize # Biggest read chunk, default 1024.
371 def peekPendingChar(self):
372 """Return, but don't consume, first character of unconsumed output from
373 file, or empty string if none."""
376 return self.buf[0] # ===>
381 sel = select.select([self.fd], [], [self.fd], 0)
385 self.buf = os.read(self.fd, self.chunkSize) # ===>
386 return self.buf[0] # Assume select don't lie.
387 else: return '' # ===>
390 def readPendingChar(self):
391 """Consume first character of unconsumed output from file, or empty
395 got, self.buf = self.buf[0], self.buf[1:]
401 sel = select.select([self.fd], [], [self.fd], 0)
405 return os.read(self.fd, 1) # ===>
406 else: return '' # ===>
408 def readPendingChars(self, max=None):
409 """Consume uncomsumed output from FILE, or empty string if nothing
414 if (max > 0) and (len(self.buf) > max):
415 got = self.buf[0:max]
416 self.buf = self.buf[max:]
418 got, self.buf = self.buf, ''
424 sel = select.select([self.fd], [], [self.fd], 0)
428 got = got + os.read(self.fd, self.chunkSize)
435 self.buf = self.buf + got[max:]
441 def readPendingLine(self, block=0):
442 """Return pending output from FILE, up to first newline (inclusive).
444 Does not block unless optional arg BLOCK is true.
446 Note that an error will be raised if a new eof is encountered without
450 to = string.find(self.buf, '\n')
452 got, self.buf = self.buf[:to+1], self.buf[to+1:]
454 got, self.buf = self.buf, ''
460 # Herein, 'got' contains the (former) contents of the buffer, and it
461 # doesn't include a newline.
463 period = block and 1.0 or 0 # don't be too busy while waiting
464 while 1: # (we'll only loop if block set)
465 sel = select.select(fdlist, [], fdlist, period)
469 got = got + os.read(self.fd, self.chunkSize)
471 to = string.find(got, '\n')
473 got, self.buf = got[:to+1], got[to+1:]
478 self.buf = '' # this is how an ordinary file acts...
480 # otherwise - no newline, blocking requested, no eof - loop. # ==^
483 """Return next output line from file, blocking until it is received."""
485 return self.readPendingLine(1) # ===>
488 #############################################################################
489 ##### Encapsulated reading and writing #####
490 #############################################################################
491 # Encapsulate messages so the end can be unambiguously identified, even
492 # when they contain multiple, possibly empty lines.
495 """Encapsulate stream object for record-oriented IO.
497 Particularly useful when dealing with non-line oriented communications
498 over pipes, eg with subprocesses."""
500 # Message is written preceded by a line containing the message length.
502 def __init__(self, f):
505 def write_record(self, s):
506 "Write so self.read knows exactly how much to read."
507 f = self.__dict__['file']
508 f.write("%s\n%s" % (len(s), s))
509 if hasattr(f, 'flush'):
512 def read_record(self):
513 "Read and reconstruct message as prepared by self.write."
514 f = self.__dict__['file']
515 line = f.readline()[:-1]
518 l = string.atoi(line)
520 raise IOError(("corrupt %s file structure"
521 % self.__class__.__name__))
527 def __getattr__(self, attr):
528 """Implement characteristic file object attributes."""
529 f = self.__dict__['file']
531 return getattr(f, attr)
533 raise AttributeError(attr)
536 return "<%s of %s at %s>" % (self.__class__.__name__,
537 self.__dict__['file'],
541 """Exercise encapsulated write/read with an arbitrary string.
543 Raise IOError if the string gets distorted through transmission!"""
544 from StringIO import StringIO
550 show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
552 raise IOError("String distorted:\n%s" % show)
554 #############################################################################
555 ##### An example subprocess interfaces #####
556 #############################################################################
559 """Convenient interface to CCSO 'ph' nameserver subprocess.
561 .query('string...') method takes a query and returns a list of dicts, each
562 of which represents one entry."""
564 # Note that i made this a class that handles a subprocess object, rather
565 # than one that inherits from it. I didn't see any functional
566 # disadvantages, and didn't think that full support of the entire
567 # Subprocess functionality was in any way suitable for interaction with
568 # this specialized interface. ? klm 13-Jan-1995
572 self.proc = Subprocess('ph', 1)
574 raise SubprocessError('failure starting ph: %s' % # ===>
578 """Send a query and return a list of dicts for responses.
580 Raise a ValueError if ph responds with an error."""
584 self.proc.writeline('query ' + q)
587 response = self.getreply() # Should get null on new prompt.
588 errs = self.proc.readPendingErrChars()
590 sys.stderr.write(errs)
596 elif type(response) == types.StringType:
597 raise ValueError("ph failed match: '%s'" % response) # ===>
598 for line in response:
600 line = string.splitfields(line, ':')
601 it[string.strip(line[0])] = (
602 string.strip(string.join(line[1:])))
605 """Consume next response from ph, returning list of lines or string
607 # Key on first char: (First line may lack newline.)
608 # - dash discard line
609 # - 'ph> ' conclusion of response
610 # - number error message
611 # - whitespace beginning of next response
613 nextChar = self.proc.waitForPendingChar(60)
615 raise SubprocessError('ph subprocess not responding') # ===>
616 elif nextChar == '-':
617 # dashed line - discard it, and continue reading:
619 return self.getreply() # ===>
620 elif nextChar == 'p':
621 # 'ph> ' prompt - don't think we should hit this, but what the hay:
623 elif nextChar in '0123456789':
624 # Error notice - we're currently assuming single line errors:
625 return self.proc.readline()[:-1] # ===>
626 elif nextChar in ' \t':
627 # Get content, up to next dashed line:
629 while nextChar != '-' and nextChar != '':
630 got.append(self.proc.readline()[:-1])
631 nextChar = self.proc.peekPendingChar()
634 return "<Ph instance, %s at %s>\n" % (self.proc.status(),
637 """Clear-out initial preface or residual subproc input and output."""
638 pause = .5; maxIter = 10 # 5 seconds to clear
642 while iterations < maxIter:
643 got = got + self.proc.readPendingChars()
644 # Strip out all but the last incomplete line:
645 got = string.splitfields(got, '\n')[-1]
646 if got == 'ph> ': return # Ok. ===>
648 raise SubprocessError('ph not responding within %s secs' %
651 #############################################################################
653 #############################################################################
656 print("\tOpening bogus subprocess, should fail:")
658 b = Subprocess('/', 1)
659 print("\tOops! Null-named subprocess startup *succeeded*?!?")
660 except SubprocessError:
661 print("\t...yep, it failed.")
662 print("\tOpening cat subprocess:")
663 p = Subprocess('cat', 1) # set to expire noisily...
665 print('\tWrite, then read, two newline-teriminated lines, using readline:')
666 p.write('first full line written\n'); p.write('second.\n')
667 print(repr(p.readline()))
668 print(repr(p.readline()))
669 print('\tThree lines, last sans newline, read using combination:')
670 p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
671 print('\tFirst line via readline:')
672 print(repr(p.readline()))
673 print('\tRest via readPendingChars:')
674 print(p.readPendingChars())
675 print("\tStopping then continuing subprocess (verbose):")
676 if not p.stop(1): # verbose stop
677 print('\t** Stop seems to have failed!')
679 print('\tWriting line while subprocess is paused...')
680 p.write('written while subprocess paused\n')
681 print('\tNonblocking read of paused subprocess (should be empty):')
682 print(p.readPendingChars())
683 print('\tContinuing subprocess (verbose):')
684 if not p.cont(1): # verbose continue
685 print('\t** Continue seems to have failed! Probly lost subproc...')
688 print('\tReading accumulated line, blocking read:')
690 print("\tDeleting subproc, which was set to die noisily:")
695 if __name__ == '__main__':