3 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
5 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
8 Subprocess class features:
10 - provides non-blocking stdin and stderr reads
12 - provides subprocess stop and continue, kill-on-deletion
14 - provides detection of subprocess startup failure
16 - Subprocess objects have nice, informative string rep (as every good object
19 __version__ = "Revision: 1.15 "
21 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
22 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
24 # Prior art: Initially based python code examples demonstrating usage of pipes
25 # and subprocesses, primarily one by jose pereira.
27 # Implementation notes:
28 # - I'm not using the fcntl module to implement non-blocking file descriptors,
29 # because i don't know what all in it is portable and what is not. I'm not
30 # about to provide for different platform contingencies - at that extent, the
31 # effort would be better spent hacking 'expect' into python.
32 # - Todo? - Incorporate an error-output handler approach, where error output is
33 # checked on regular IO, when a handler is defined, and passed to the
34 # handler (eg for printing) immediately as it shows...
35 # - Detection of failed subprocess startup is a gross kludge, at present.
37 # - new additions (1.3, 1.4):
38 # - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
39 # reads based solely on os.read with select, while capitalizing big-time on
40 # multi-char read chunking.
41 # - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
43 # ken.manheimer@nist.gov
45 # This is a modified version by Oleg Broytman <phd@phdru.name>.
46 # The original version is still preserved at
47 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
49 import sys, os, string, time, types
54 class SubprocessError(Exception):
57 # You may need to increase execvp_grace_seconds, if you have a large or slow
59 execvp_grace_seconds = 0.5
62 """Run and communicate asynchronously with a subprocess.
64 Provides non-blocking reads in the form of .readPendingChars and
67 .readline will block until it gets a complete line.
69 .peekPendingChar does a non-blocking, non-consuming read for pending
70 output, and can be used before .readline to check non-destructively for
71 pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until
72 a new character is pending, or timeout secs pass, with granularity of
75 There are corresponding read and peekPendingErrXXX routines, to read from
76 the subprocess stderr stream."""
80 expire_noisily = 1 # Announce subproc destruction?
82 readbuf = 0 # fork will assign to be a readbuf obj
83 errbuf = 0 # fork will assign to be a readbuf obj
85 def __init__(self, cmd, control_stderr=0, expire_noisily=0,
86 in_fd=0, out_fd=1, err_fd=2):
87 """Launch a subprocess, given command string COMMAND."""
90 self.expire_noisily = expire_noisily
91 self.control_stderr = control_stderr
92 self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
95 def fork(self, cmd=None):
96 """Fork a subprocess with designated COMMAND (default, self.cmd)."""
97 if cmd: self.cmd = cmd
99 cmd = string.split(self.cmd)
100 pRc, cWp = os.pipe() # parent-read-child, child-write-parent
101 cRp, pWc = os.pipe() # child-read-parent, parent-write-child
102 pRe, cWe = os.pipe() # parent-read-error, child-write-error
103 self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
107 if self.pid == 0: #### CHILD ####
108 parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
109 # Reopen stdin, out, err, on pipe ends:
110 os.dup2(cRp, self.in_fd) # cRp = sys.stdin
111 os.dup2(cWp, self.out_fd) # cWp = sys.stdout
112 if self.control_stderr:
113 os.dup2(cWe, self.err_fd) # cWe = sys.stderr
114 # Ensure (within reason) stray file descriptors are closed:
115 excludes = [self.in_fd, self.out_fd, self.err_fd]
116 for i in range(4,100):
117 if i not in excludes:
119 except os.error: pass
122 os.execvp(cmd[0], cmd)
123 os._exit(1) # Shouldn't get here
125 except os.error as e:
126 if self.control_stderr:
127 os.dup2(parentErr, 2) # Reconnect to parent's stdout
128 sys.stderr.write("**execvp failed, '%s'**\n" %
131 os._exit(1) # Shouldn't get here.
134 # Connect to the child's file descriptors, using our customized
136 self.toChild = os.fdopen(pWc, 'w')
137 self.toChild_fdlist = [pWc]
138 self.readbuf = ReadBuf(pRc)
139 self.errbuf = ReadBuf(pRe)
140 time.sleep(execvp_grace_seconds)
142 pid, err = os.waitpid(self.pid, os.WNOHANG)
143 except os.error as error:
146 raise SubprocessError("Subprocess '%s' failed." % self.cmd)
147 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
149 # child exited already
152 rc = (err & 0xff00) >> 8
154 raise SubprocessError(
155 "child killed by signal %d with a return code of %d"
158 raise SubprocessError(
159 "child exited with return code %d" % rc)
160 # Child may have exited, but not in error, so we won't say
161 # anything more at this point.
163 ### Write input to subprocess ###
165 def write(self, str):
166 """Write a STRING to the subprocess."""
169 raise SubprocessError("no child") # ===>
170 if select.select([],self.toChild_fdlist,[],0)[1]:
171 self.toChild.write(str)
174 # XXX Can write-buffer full be handled better??
175 raise IOError("write to %s blocked" % self) # ===>
177 def writeline(self, line=''):
178 """Write STRING, with added newline termination, to subprocess."""
179 self.write(line + '\n')
181 ### Get output from subprocess ###
183 def peekPendingChar(self):
184 """Return, but (effectively) do not consume a single pending output
185 char, or return null string if none pending."""
187 return self.readbuf.peekPendingChar() # ===>
188 def peekPendingErrChar(self):
189 """Return, but (effectively) do not consume a single pending output
190 char, or return null string if none pending."""
192 return self.errbuf.peekPendingChar() # ===>
194 def waitForPendingChar(self, timeout, pollPause=.1):
195 """Block max TIMEOUT secs until we peek a pending char, returning the
196 char, or '' if none encountered.
198 Pause POLLPAUSE secs (default .1) between polls."""
202 nextChar = self.readbuf.peekPendingChar()
203 if nextChar or (accume > timeout): return nextChar
204 time.sleep(pollPause)
205 accume = accume + pollPause
207 def read(self, n=None):
208 """Read N chars, or all pending if no N specified."""
210 return self.readPendingChars()
213 got0 = self.readPendingChars(n)
217 def readPendingChars(self, max=None):
218 """Read all currently pending subprocess output as a single string."""
219 return self.readbuf.readPendingChars(max)
220 def readPendingErrChars(self):
221 """Read all currently pending subprocess error output as a single
223 if self.control_stderr:
224 return self.errbuf.readPendingChars()
226 raise SubprocessError("Haven't grabbed subprocess error stream.")
228 def readPendingLine(self):
229 """Read currently pending subprocess output, up to a complete line
230 (newline inclusive)."""
231 return self.readbuf.readPendingLine()
232 def readPendingErrLine(self):
233 """Read currently pending subprocess error output, up to a complete
234 line (newline inclusive)."""
235 if self.control_stderr:
236 return self.errbuf.readPendingLine()
238 raise SubprocessError("Haven't grabbed subprocess error stream.")
241 """Return next complete line of subprocess output, blocking until
243 return self.readbuf.readline()
244 def readlineErr(self):
245 """Return next complete line of subprocess error output, blocking until
247 if self.control_stderr:
248 return self.errbuf.readline()
250 raise SubprocessError("Haven't grabbed subprocess error stream.")
252 ### Subprocess Control ###
255 """True if subprocess is alive and kicking."""
256 return self.status(boolean=1)
257 def status(self, boolean=0):
258 """Return string indicating whether process is alive or dead."""
261 status = 'sans command'
263 status = 'sans process'
264 elif not self.cont():
265 status = "(unresponding) '%s'" % self.cmd
267 status = "'%s'" % self.cmd
274 def stop(self, verbose=1):
275 """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
278 os.kill(self.pid, signal.SIGSTOP)
281 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
283 if verbose: print("Stopped '%s'" % self.cmd)
286 def cont(self, verbose=0):
287 """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
290 os.kill(self.pid, signal.SIGCONT)
293 print(("Continue failed for '%s' - '%s'" %
294 (self.cmd, sys.exc_value)))
296 if verbose: print("Continued '%s'" % self.cmd)
300 """Send process PID signal SIG (default 9, 'kill'), returning None once
301 it is successfully reaped.
303 SubprocessError is raised if process is not successfully killed."""
306 raise SubprocessError("No process") # ===>
307 elif not self.cont():
308 raise SubprocessError("Can't signal subproc %s" % self) # ===>
310 # Try sending first a TERM and then a KILL signal.
312 sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
315 os.kill(self.pid, sig[1])
319 # Try a couple or three times to reap the process with waitpid:
321 # WNOHANG == 1 on sunos, presumably same elsewhere.
322 if os.waitpid(self.pid, os.WNOHANG):
323 if self.expire_noisily:
324 print(("\n(%s subproc %d '%s' / %s)" %
325 (sig[0], self.pid, self.cmd,
327 for i in self.pipefiles:
332 # Only got here if subprocess is not gone:
333 raise SubprocessError(
334 "Failed kill of subproc %d, '%s', with signals %s" %
335 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
338 """Terminate the subprocess"""
343 status = self.status()
344 return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
346 #############################################################################
347 ##### Non-blocking read operations #####
348 #############################################################################
351 """Output buffer for non-blocking reads on selectable files like pipes and
352 sockets. Init with a file descriptor for the file."""
354 def __init__(self, fd, maxChunkSize=1024):
355 """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
361 self.eof = 0 # May be set with stuff still in .buf
363 self.chunkSize = maxChunkSize # Biggest read chunk, default 1024.
368 def peekPendingChar(self):
369 """Return, but don't consume, first character of unconsumed output from
370 file, or empty string if none."""
373 return self.buf[0] # ===>
378 sel = select.select([self.fd], [], [self.fd], 0)
382 self.buf = os.read(self.fd, self.chunkSize) # ===>
383 return self.buf[0] # Assume select don't lie.
384 else: return '' # ===>
387 def readPendingChar(self):
388 """Consume first character of unconsumed output from file, or empty
392 got, self.buf = self.buf[0], self.buf[1:]
398 sel = select.select([self.fd], [], [self.fd], 0)
402 return os.read(self.fd, 1) # ===>
403 else: return '' # ===>
405 def readPendingChars(self, max=None):
406 """Consume uncomsumed output from FILE, or empty string if nothing
411 if (max > 0) and (len(self.buf) > max):
412 got = self.buf[0:max]
413 self.buf = self.buf[max:]
415 got, self.buf = self.buf, ''
421 sel = select.select([self.fd], [], [self.fd], 0)
425 got = got + os.read(self.fd, self.chunkSize)
432 self.buf = self.buf + got[max:]
438 def readPendingLine(self, block=0):
439 """Return pending output from FILE, up to first newline (inclusive).
441 Does not block unless optional arg BLOCK is true.
443 Note that an error will be raised if a new eof is encountered without
447 to = string.find(self.buf, '\n')
449 got, self.buf = self.buf[:to+1], self.buf[to+1:]
451 got, self.buf = self.buf, ''
457 # Herein, 'got' contains the (former) contents of the buffer, and it
458 # doesn't include a newline.
460 period = block and 1.0 or 0 # don't be too busy while waiting
461 while 1: # (we'll only loop if block set)
462 sel = select.select(fdlist, [], fdlist, period)
466 got = got + os.read(self.fd, self.chunkSize)
468 to = string.find(got, '\n')
470 got, self.buf = got[:to+1], got[to+1:]
475 self.buf = '' # this is how an ordinary file acts...
477 # otherwise - no newline, blocking requested, no eof - loop. # ==^
480 """Return next output line from file, blocking until it is received."""
482 return self.readPendingLine(1) # ===>
485 #############################################################################
486 ##### Encapsulated reading and writing #####
487 #############################################################################
488 # Encapsulate messages so the end can be unambiguously identified, even
489 # when they contain multiple, possibly empty lines.
492 """Encapsulate stream object for record-oriented IO.
494 Particularly useful when dealing with non-line oriented communications
495 over pipes, eg with subprocesses."""
497 # Message is written preceded by a line containing the message length.
499 def __init__(self, f):
502 def write_record(self, s):
503 "Write so self.read knows exactly how much to read."
504 f = self.__dict__['file']
505 f.write("%s\n%s" % (len(s), s))
506 if hasattr(f, 'flush'):
509 def read_record(self):
510 "Read and reconstruct message as prepared by self.write."
511 f = self.__dict__['file']
512 line = f.readline()[:-1]
515 l = string.atoi(line)
517 raise IOError(("corrupt %s file structure"
518 % self.__class__.__name__))
524 def __getattr__(self, attr):
525 """Implement characteristic file object attributes."""
526 f = self.__dict__['file']
528 return getattr(f, attr)
530 raise AttributeError(attr)
533 return "<%s of %s at %s>" % (self.__class__.__name__,
534 self.__dict__['file'],
538 """Exercise encapsulated write/read with an arbitrary string.
540 Raise IOError if the string gets distorted through transmission!"""
541 from StringIO import StringIO
547 show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
549 raise IOError("String distorted:\n%s" % show)
551 #############################################################################
552 ##### An example subprocess interfaces #####
553 #############################################################################
556 """Convenient interface to CCSO 'ph' nameserver subprocess.
558 .query('string...') method takes a query and returns a list of dicts, each
559 of which represents one entry."""
561 # Note that i made this a class that handles a subprocess object, rather
562 # than one that inherits from it. I didn't see any functional
563 # disadvantages, and didn't think that full support of the entire
564 # Subprocess functionality was in any way suitable for interaction with
565 # this specialized interface. ? klm 13-Jan-1995
569 self.proc = Subprocess('ph', 1)
571 raise SubprocessError('failure starting ph: %s' % # ===>
575 """Send a query and return a list of dicts for responses.
577 Raise a ValueError if ph responds with an error."""
581 self.proc.writeline('query ' + q)
584 response = self.getreply() # Should get null on new prompt.
585 errs = self.proc.readPendingErrChars()
587 sys.stderr.write(errs)
593 elif type(response) == types.StringType:
594 raise ValueError("ph failed match: '%s'" % response) # ===>
595 for line in response:
597 line = string.splitfields(line, ':')
598 it[string.strip(line[0])] = (
599 string.strip(string.join(line[1:])))
602 """Consume next response from ph, returning list of lines or string
604 # Key on first char: (First line may lack newline.)
605 # - dash discard line
606 # - 'ph> ' conclusion of response
607 # - number error message
608 # - whitespace beginning of next response
610 nextChar = self.proc.waitForPendingChar(60)
612 raise SubprocessError('ph subprocess not responding') # ===>
613 elif nextChar == '-':
614 # dashed line - discard it, and continue reading:
616 return self.getreply() # ===>
617 elif nextChar == 'p':
618 # 'ph> ' prompt - don't think we should hit this, but what the hay:
620 elif nextChar in '0123456789':
621 # Error notice - we're currently assuming single line errors:
622 return self.proc.readline()[:-1] # ===>
623 elif nextChar in ' \t':
624 # Get content, up to next dashed line:
626 while nextChar != '-' and nextChar != '':
627 got.append(self.proc.readline()[:-1])
628 nextChar = self.proc.peekPendingChar()
631 return "<Ph instance, %s at %s>\n" % (self.proc.status(),
634 """Clear-out initial preface or residual subproc input and output."""
635 pause = .5; maxIter = 10 # 5 seconds to clear
639 while iterations < maxIter:
640 got = got + self.proc.readPendingChars()
641 # Strip out all but the last incomplete line:
642 got = string.splitfields(got, '\n')[-1]
643 if got == 'ph> ': return # Ok. ===>
645 raise SubprocessError('ph not responding within %s secs' %
648 #############################################################################
650 #############################################################################
653 print("\tOpening bogus subprocess, should fail:")
655 b = Subprocess('/', 1)
656 print("\tOops! Null-named subprocess startup *succeeded*?!?")
657 except SubprocessError:
658 print("\t...yep, it failed.")
659 print("\tOpening cat subprocess:")
660 p = Subprocess('cat', 1) # set to expire noisily...
662 print('\tWrite, then read, two newline-teriminated lines, using readline:')
663 p.write('first full line written\n'); p.write('second.\n')
664 print(repr(p.readline()))
665 print(repr(p.readline()))
666 print('\tThree lines, last sans newline, read using combination:')
667 p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
668 print('\tFirst line via readline:')
669 print(repr(p.readline()))
670 print('\tRest via readPendingChars:')
671 print(p.readPendingChars())
672 print("\tStopping then continuing subprocess (verbose):")
673 if not p.stop(1): # verbose stop
674 print('\t** Stop seems to have failed!')
676 print('\tWriting line while subprocess is paused...')
677 p.write('written while subprocess paused\n')
678 print('\tNonblocking read of paused subprocess (should be empty):')
679 print(p.readPendingChars())
680 print('\tContinuing subprocess (verbose):')
681 if not p.cont(1): # verbose continue
682 print('\t** Continue seems to have failed! Probly lost subproc...')
685 print('\tReading accumulated line, blocking read:')
687 print("\tDeleting subproc, which was set to die noisily:")
692 if __name__ == '__main__':