1 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
3 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
6 Subprocess class features:
8 - provides non-blocking stdin and stderr reads
10 - provides subprocess stop and continue, kill-on-deletion
12 - provides detection of subprocess startup failure
14 - Subprocess objects have nice, informative string rep (as every good object
17 __version__ = "Revision: 1.15 "
19 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
20 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
22 # Prior art: Initially based python code examples demonstrating usage of pipes
23 # and subprocesses, primarily one by jose pereira.
25 # Implementation notes:
26 # - I'm not using the fcntl module to implement non-blocking file descriptors,
27 # because i don't know what all in it is portable and what is not. I'm not
28 # about to provide for different platform contingencies - at that extent, the
29 # effort would be better spent hacking 'expect' into python.
30 # - Todo? - Incorporate an error-output handler approach, where error output is
31 # checked on regular IO, when a handler is defined, and passed to the
32 # handler (eg for printing) immediately as it shows...
33 # - Detection of failed subprocess startup is a gross kludge, at present.
35 # - new additions (1.3, 1.4):
36 # - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
37 # reads based solely on os.read with select, while capitalizing big-time on
38 # multi-char read chunking.
39 # - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
41 # ken.manheimer@nist.gov
44 import sys, os, string, time, types
49 SubprocessError = 'SubprocessError'
50 # You may need to increase execvp_grace_seconds, if you have a large or slow
52 execvp_grace_seconds = 0.5
55 """Run and communicate asynchronously with a subprocess.
57 Provides non-blocking reads in the form of .readPendingChars and
60 .readline will block until it gets a complete line.
62 .peekPendingChar does a non-blocking, non-consuming read for pending
63 output, and can be used before .readline to check non-destructively for
64 pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until
65 a new character is pending, or timeout secs pass, with granularity of
68 There are corresponding read and peekPendingErrXXX routines, to read from
69 the subprocess stderr stream."""
73 expire_noisily = 1 # Announce subproc destruction?
75 readbuf = 0 # fork will assign to be a readbuf obj
76 errbuf = 0 # fork will assign to be a readbuf obj
78 def __init__(self, cmd, control_stderr=0, expire_noisily=0,
79 in_fd=0, out_fd=1, err_fd=2):
80 """Launch a subprocess, given command string COMMAND."""
83 self.expire_noisily = expire_noisily
84 self.control_stderr = control_stderr
85 self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
88 def fork(self, cmd=None):
89 """Fork a subprocess with designated COMMAND (default, self.cmd)."""
90 if cmd: self.cmd = cmd
92 cmd = string.split(self.cmd)
93 pRc, cWp = os.pipe() # parent-read-child, child-write-parent
94 cRp, pWc = os.pipe() # child-read-parent, parent-write-child
95 pRe, cWe = os.pipe() # parent-read-error, child-write-error
96 self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
100 if self.pid == 0: #### CHILD ####
101 parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
102 # Reopen stdin, out, err, on pipe ends:
103 os.dup2(cRp, self.in_fd) # cRp = sys.stdin
104 os.dup2(cWp, self.out_fd) # cWp = sys.stdout
105 if self.control_stderr:
106 os.dup2(cWe, self.err_fd) # cWe = sys.stderr
107 # Ensure (within reason) stray file descriptors are closed:
108 excludes = [self.in_fd, self.out_fd, self.err_fd]
109 for i in range(4,100):
110 if i not in excludes:
112 except os.error: pass
115 os.execvp(cmd[0], cmd)
116 os._exit(1) # Shouldn't get here
118 except os.error as e:
119 if self.control_stderr:
120 os.dup2(parentErr, 2) # Reconnect to parent's stdout
121 sys.stderr.write("**execvp failed, '%s'**\n" %
124 os._exit(1) # Shouldn't get here.
127 # Connect to the child's file descriptors, using our customized
129 self.toChild = os.fdopen(pWc, 'w')
130 self.toChild_fdlist = [pWc]
131 self.readbuf = ReadBuf(pRc)
132 self.errbuf = ReadBuf(pRe)
133 time.sleep(execvp_grace_seconds)
135 pid, err = os.waitpid(self.pid, os.WNOHANG)
136 except os.error as error:
139 raise SubprocessError("Subprocess '%s' failed." % self.cmd)
140 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
142 # child exited already
145 rc = (err & 0xff00) >> 8
147 raise SubprocessError(
148 "child killed by signal %d with a return code of %d"
151 raise SubprocessError(
152 "child exited with return code %d" % rc)
153 # Child may have exited, but not in error, so we won't say
154 # anything more at this point.
156 ### Write input to subprocess ###
158 def write(self, str):
159 """Write a STRING to the subprocess."""
162 raise SubprocessError("no child") # ===>
163 if select.select([],self.toChild_fdlist,[],0)[1]:
164 self.toChild.write(str)
167 # XXX Can write-buffer full be handled better??
168 raise IOError("write to %s blocked" % self) # ===>
170 def writeline(self, line=''):
171 """Write STRING, with added newline termination, to subprocess."""
172 self.write(line + '\n')
174 ### Get output from subprocess ###
176 def peekPendingChar(self):
177 """Return, but (effectively) do not consume a single pending output
178 char, or return null string if none pending."""
180 return self.readbuf.peekPendingChar() # ===>
181 def peekPendingErrChar(self):
182 """Return, but (effectively) do not consume a single pending output
183 char, or return null string if none pending."""
185 return self.errbuf.peekPendingChar() # ===>
187 def waitForPendingChar(self, timeout, pollPause=.1):
188 """Block max TIMEOUT secs until we peek a pending char, returning the
189 char, or '' if none encountered.
191 Pause POLLPAUSE secs (default .1) between polls."""
195 nextChar = self.readbuf.peekPendingChar()
196 if nextChar or (accume > timeout): return nextChar
197 time.sleep(pollPause)
198 accume = accume + pollPause
200 def read(self, n=None):
201 """Read N chars, or all pending if no N specified."""
203 return self.readPendingChars()
206 got0 = self.readPendingChars(n)
210 def readPendingChars(self, max=None):
211 """Read all currently pending subprocess output as a single string."""
212 return self.readbuf.readPendingChars(max)
213 def readPendingErrChars(self):
214 """Read all currently pending subprocess error output as a single
216 if self.control_stderr:
217 return self.errbuf.readPendingChars()
219 raise SubprocessError("Haven't grabbed subprocess error stream.")
221 def readPendingLine(self):
222 """Read currently pending subprocess output, up to a complete line
223 (newline inclusive)."""
224 return self.readbuf.readPendingLine()
225 def readPendingErrLine(self):
226 """Read currently pending subprocess error output, up to a complete
227 line (newline inclusive)."""
228 if self.control_stderr:
229 return self.errbuf.readPendingLine()
231 raise SubprocessError("Haven't grabbed subprocess error stream.")
234 """Return next complete line of subprocess output, blocking until
236 return self.readbuf.readline()
237 def readlineErr(self):
238 """Return next complete line of subprocess error output, blocking until
240 if self.control_stderr:
241 return self.errbuf.readline()
243 raise SubprocessError("Haven't grabbed subprocess error stream.")
245 ### Subprocess Control ###
248 """True if subprocess is alive and kicking."""
249 return self.status(boolean=1)
250 def status(self, boolean=0):
251 """Return string indicating whether process is alive or dead."""
254 status = 'sans command'
256 status = 'sans process'
257 elif not self.cont():
258 status = "(unresponding) '%s'" % self.cmd
260 status = "'%s'" % self.cmd
267 def stop(self, verbose=1):
268 """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
271 os.kill(self.pid, signal.SIGSTOP)
274 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
276 if verbose: print("Stopped '%s'" % self.cmd)
279 def cont(self, verbose=0):
280 """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
283 os.kill(self.pid, signal.SIGCONT)
286 print(("Continue failed for '%s' - '%s'" %
287 (self.cmd, sys.exc_value)))
289 if verbose: print("Continued '%s'" % self.cmd)
293 """Send process PID signal SIG (default 9, 'kill'), returning None once
294 it is successfully reaped.
296 SubprocessError is raised if process is not successfully killed."""
299 raise SubprocessError("No process") # ===>
300 elif not self.cont():
301 raise SubprocessError("Can't signal subproc %s" % self) # ===>
303 # Try sending first a TERM and then a KILL signal.
305 sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
308 os.kill(self.pid, sig[1])
312 # Try a couple or three times to reap the process with waitpid:
314 # WNOHANG == 1 on sunos, presumably same elsewhere.
315 if os.waitpid(self.pid, os.WNOHANG):
316 if self.expire_noisily:
317 print(("\n(%s subproc %d '%s' / %s)" %
318 (sig[0], self.pid, self.cmd,
320 for i in self.pipefiles:
325 # Only got here if subprocess is not gone:
326 raise SubprocessError(
327 "Failed kill of subproc %d, '%s', with signals %s" %
328 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
331 """Terminate the subprocess"""
336 status = self.status()
337 return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
339 #############################################################################
340 ##### Non-blocking read operations #####
341 #############################################################################
344 """Output buffer for non-blocking reads on selectable files like pipes and
345 sockets. Init with a file descriptor for the file."""
347 def __init__(self, fd, maxChunkSize=1024):
348 """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
354 self.eof = 0 # May be set with stuff still in .buf
356 self.chunkSize = maxChunkSize # Biggest read chunk, default 1024.
361 def peekPendingChar(self):
362 """Return, but don't consume, first character of unconsumed output from
363 file, or empty string if none."""
366 return self.buf[0] # ===>
371 sel = select.select([self.fd], [], [self.fd], 0)
375 self.buf = os.read(self.fd, self.chunkSize) # ===>
376 return self.buf[0] # Assume select don't lie.
377 else: return '' # ===>
380 def readPendingChar(self):
381 """Consume first character of unconsumed output from file, or empty
385 got, self.buf = self.buf[0], self.buf[1:]
391 sel = select.select([self.fd], [], [self.fd], 0)
395 return os.read(self.fd, 1) # ===>
396 else: return '' # ===>
398 def readPendingChars(self, max=None):
399 """Consume uncomsumed output from FILE, or empty string if nothing
404 if (max > 0) and (len(self.buf) > max):
405 got = self.buf[0:max]
406 self.buf = self.buf[max:]
408 got, self.buf = self.buf, ''
414 sel = select.select([self.fd], [], [self.fd], 0)
418 got = got + os.read(self.fd, self.chunkSize)
425 self.buf = self.buf + got[max:]
431 def readPendingLine(self, block=0):
432 """Return pending output from FILE, up to first newline (inclusive).
434 Does not block unless optional arg BLOCK is true.
436 Note that an error will be raised if a new eof is encountered without
440 to = string.find(self.buf, '\n')
442 got, self.buf = self.buf[:to+1], self.buf[to+1:]
444 got, self.buf = self.buf, ''
450 # Herein, 'got' contains the (former) contents of the buffer, and it
451 # doesn't include a newline.
453 period = block and 1.0 or 0 # don't be too busy while waiting
454 while 1: # (we'll only loop if block set)
455 sel = select.select(fdlist, [], fdlist, period)
459 got = got + os.read(self.fd, self.chunkSize)
461 to = string.find(got, '\n')
463 got, self.buf = got[:to+1], got[to+1:]
468 self.buf = '' # this is how an ordinary file acts...
470 # otherwise - no newline, blocking requested, no eof - loop. # ==^
473 """Return next output line from file, blocking until it is received."""
475 return self.readPendingLine(1) # ===>
478 #############################################################################
479 ##### Encapsulated reading and writing #####
480 #############################################################################
481 # Encapsulate messages so the end can be unambiguously identified, even
482 # when they contain multiple, possibly empty lines.
485 """Encapsulate stream object for record-oriented IO.
487 Particularly useful when dealing with non-line oriented communications
488 over pipes, eg with subprocesses."""
490 # Message is written preceded by a line containing the message length.
492 def __init__(self, f):
495 def write_record(self, s):
496 "Write so self.read knows exactly how much to read."
497 f = self.__dict__['file']
498 f.write("%s\n%s" % (len(s), s))
499 if hasattr(f, 'flush'):
502 def read_record(self):
503 "Read and reconstruct message as prepared by self.write."
504 f = self.__dict__['file']
505 line = f.readline()[:-1]
508 l = string.atoi(line)
510 raise IOError(("corrupt %s file structure"
511 % self.__class__.__name__))
517 def __getattr__(self, attr):
518 """Implement characteristic file object attributes."""
519 f = self.__dict__['file']
521 return getattr(f, attr)
523 raise AttributeError(attr)
526 return "<%s of %s at %s>" % (self.__class__.__name__,
527 self.__dict__['file'],
531 """Exercise encapsulated write/read with an arbitrary string.
533 Raise IOError if the string gets distorted through transmission!"""
534 from StringIO import StringIO
540 show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
542 raise IOError("String distorted:\n%s" % show)
544 #############################################################################
545 ##### An example subprocess interfaces #####
546 #############################################################################
549 """Convenient interface to CCSO 'ph' nameserver subprocess.
551 .query('string...') method takes a query and returns a list of dicts, each
552 of which represents one entry."""
554 # Note that i made this a class that handles a subprocess object, rather
555 # than one that inherits from it. I didn't see any functional
556 # disadvantages, and didn't think that full support of the entire
557 # Subprocess functionality was in any way suitable for interaction with
558 # this specialized interface. ? klm 13-Jan-1995
562 self.proc = Subprocess('ph', 1)
564 raise SubprocessError('failure starting ph: %s' % # ===>
568 """Send a query and return a list of dicts for responses.
570 Raise a ValueError if ph responds with an error."""
574 self.proc.writeline('query ' + q)
577 response = self.getreply() # Should get null on new prompt.
578 errs = self.proc.readPendingErrChars()
580 sys.stderr.write(errs)
586 elif type(response) == types.StringType:
587 raise ValueError("ph failed match: '%s'" % response) # ===>
588 for line in response:
590 line = string.splitfields(line, ':')
591 it[string.strip(line[0])] = (
592 string.strip(string.join(line[1:])))
595 """Consume next response from ph, returning list of lines or string
597 # Key on first char: (First line may lack newline.)
598 # - dash discard line
599 # - 'ph> ' conclusion of response
600 # - number error message
601 # - whitespace beginning of next response
603 nextChar = self.proc.waitForPendingChar(60)
605 raise SubprocessError('ph subprocess not responding') # ===>
606 elif nextChar == '-':
607 # dashed line - discard it, and continue reading:
609 return self.getreply() # ===>
610 elif nextChar == 'p':
611 # 'ph> ' prompt - don't think we should hit this, but what the hay:
613 elif nextChar in '0123456789':
614 # Error notice - we're currently assuming single line errors:
615 return self.proc.readline()[:-1] # ===>
616 elif nextChar in ' \t':
617 # Get content, up to next dashed line:
619 while nextChar != '-' and nextChar != '':
620 got.append(self.proc.readline()[:-1])
621 nextChar = self.proc.peekPendingChar()
624 return "<Ph instance, %s at %s>\n" % (self.proc.status(),
627 """Clear-out initial preface or residual subproc input and output."""
628 pause = .5; maxIter = 10 # 5 seconds to clear
632 while iterations < maxIter:
633 got = got + self.proc.readPendingChars()
634 # Strip out all but the last incomplete line:
635 got = string.splitfields(got, '\n')[-1]
636 if got == 'ph> ': return # Ok. ===>
638 raise SubprocessError('ph not responding within %s secs' %
641 #############################################################################
643 #############################################################################
646 print("\tOpening subprocess:")
647 p = Subprocess('cat', 1) # set to expire noisily...
649 print("\tOpening bogus subprocess, should fail:")
651 b = Subprocess('/', 1)
652 print("\tOops! Null-named subprocess startup *succeeded*?!?")
653 except SubprocessError:
654 print("\t...yep, it failed.")
655 print('\tWrite, then read, two newline-teriminated lines, using readline:')
656 p.write('first full line written\n'); p.write('second.\n')
657 print(repr(p.readline()))
658 print(repr(p.readline()))
659 print('\tThree lines, last sans newline, read using combination:')
660 p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
661 print('\tFirst line via readline:')
662 print(repr(p.readline()))
663 print('\tRest via readPendingChars:')
664 print(p.readPendingChars())
665 print("\tStopping then continuing subprocess (verbose):")
666 if not p.stop(1): # verbose stop
667 print('\t** Stop seems to have failed!')
669 print('\tWriting line while subprocess is paused...')
670 p.write('written while subprocess paused\n')
671 print('\tNonblocking read of paused subprocess (should be empty):')
672 print(p.readPendingChars())
673 print('\tContinuing subprocess (verbose):')
674 if not p.cont(1): # verbose continue
675 print('\t** Continue seems to have failed! Probly lost subproc...')
678 print('\tReading accumulated line, blocking read:')
680 print("\tDeleting subproc, which was set to die noisily:")