1 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
3 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
6 Subprocess class features:
8 - provides non-blocking stdin and stderr reads
10 - provides subprocess stop and continue, kill-on-deletion
12 - provides detection of subprocess startup failure
14 - Subprocess objects have nice, informative string rep (as every good object
17 __version__ = "Revision: 1.15 "
19 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
20 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
22 # Prior art: Initially based python code examples demonstrating usage of pipes
23 # and subprocesses, primarily one by jose pereira.
25 # Implementation notes:
26 # - I'm not using the fcntl module to implement non-blocking file descriptors,
27 # because i don't know what all in it is portable and what is not. I'm not
28 # about to provide for different platform contingencies - at that extent, the
29 # effort would be better spent hacking 'expect' into python.
30 # - Todo? - Incorporate an error-output handler approach, where error output is
31 # checked on regular IO, when a handler is defined, and passed to the
32 # handler (eg for printing) immediately as it shows...
33 # - Detection of failed subprocess startup is a gross kludge, at present.
35 # - new additions (1.3, 1.4):
36 # - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
37 # reads based solely on os.read with select, while capitalizing big-time on
38 # multi-char read chunking.
39 # - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
41 # ken.manheimer@nist.gov
43 # This is a modified version by Oleg Broytman <phd@phdru.name>.
44 # The original version is still preserved at
45 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
47 import sys, os, string, time, types
52 class SubprocessError(Exception):
55 # You may need to increase execvp_grace_seconds, if you have a large or slow
57 execvp_grace_seconds = 0.5
60 """Run and communicate asynchronously with a subprocess.
62 Provides non-blocking reads in the form of .readPendingChars and
65 .readline will block until it gets a complete line.
67 .peekPendingChar does a non-blocking, non-consuming read for pending
68 output, and can be used before .readline to check non-destructively for
69 pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until
70 a new character is pending, or timeout secs pass, with granularity of
73 There are corresponding read and peekPendingErrXXX routines, to read from
74 the subprocess stderr stream."""
78 expire_noisily = 1 # Announce subproc destruction?
80 readbuf = 0 # fork will assign to be a readbuf obj
81 errbuf = 0 # fork will assign to be a readbuf obj
83 def __init__(self, cmd, control_stderr=0, expire_noisily=0,
84 in_fd=0, out_fd=1, err_fd=2):
85 """Launch a subprocess, given command string COMMAND."""
88 self.expire_noisily = expire_noisily
89 self.control_stderr = control_stderr
90 self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
93 def fork(self, cmd=None):
94 """Fork a subprocess with designated COMMAND (default, self.cmd)."""
95 if cmd: self.cmd = cmd
97 cmd = string.split(self.cmd)
98 pRc, cWp = os.pipe() # parent-read-child, child-write-parent
99 cRp, pWc = os.pipe() # child-read-parent, parent-write-child
100 pRe, cWe = os.pipe() # parent-read-error, child-write-error
101 self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
105 if self.pid == 0: #### CHILD ####
106 parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
107 # Reopen stdin, out, err, on pipe ends:
108 os.dup2(cRp, self.in_fd) # cRp = sys.stdin
109 os.dup2(cWp, self.out_fd) # cWp = sys.stdout
110 if self.control_stderr:
111 os.dup2(cWe, self.err_fd) # cWe = sys.stderr
112 # Ensure (within reason) stray file descriptors are closed:
113 excludes = [self.in_fd, self.out_fd, self.err_fd]
114 for i in range(4,100):
115 if i not in excludes:
117 except os.error: pass
120 os.execvp(cmd[0], cmd)
121 os._exit(1) # Shouldn't get here
123 except os.error as e:
124 if self.control_stderr:
125 os.dup2(parentErr, 2) # Reconnect to parent's stdout
126 sys.stderr.write("**execvp failed, '%s'**\n" %
129 os._exit(1) # Shouldn't get here.
132 # Connect to the child's file descriptors, using our customized
134 self.toChild = os.fdopen(pWc, 'w')
135 self.toChild_fdlist = [pWc]
136 self.readbuf = ReadBuf(pRc)
137 self.errbuf = ReadBuf(pRe)
138 time.sleep(execvp_grace_seconds)
140 pid, err = os.waitpid(self.pid, os.WNOHANG)
141 except os.error as error:
144 raise SubprocessError("Subprocess '%s' failed." % self.cmd)
145 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
147 # child exited already
150 rc = (err & 0xff00) >> 8
152 raise SubprocessError(
153 "child killed by signal %d with a return code of %d"
156 raise SubprocessError(
157 "child exited with return code %d" % rc)
158 # Child may have exited, but not in error, so we won't say
159 # anything more at this point.
161 ### Write input to subprocess ###
163 def write(self, str):
164 """Write a STRING to the subprocess."""
167 raise SubprocessError("no child") # ===>
168 if select.select([],self.toChild_fdlist,[],0)[1]:
169 self.toChild.write(str)
172 # XXX Can write-buffer full be handled better??
173 raise IOError("write to %s blocked" % self) # ===>
175 def writeline(self, line=''):
176 """Write STRING, with added newline termination, to subprocess."""
177 self.write(line + '\n')
179 ### Get output from subprocess ###
181 def peekPendingChar(self):
182 """Return, but (effectively) do not consume a single pending output
183 char, or return null string if none pending."""
185 return self.readbuf.peekPendingChar() # ===>
186 def peekPendingErrChar(self):
187 """Return, but (effectively) do not consume a single pending output
188 char, or return null string if none pending."""
190 return self.errbuf.peekPendingChar() # ===>
192 def waitForPendingChar(self, timeout, pollPause=.1):
193 """Block max TIMEOUT secs until we peek a pending char, returning the
194 char, or '' if none encountered.
196 Pause POLLPAUSE secs (default .1) between polls."""
200 nextChar = self.readbuf.peekPendingChar()
201 if nextChar or (accume > timeout): return nextChar
202 time.sleep(pollPause)
203 accume = accume + pollPause
205 def read(self, n=None):
206 """Read N chars, or all pending if no N specified."""
208 return self.readPendingChars()
211 got0 = self.readPendingChars(n)
215 def readPendingChars(self, max=None):
216 """Read all currently pending subprocess output as a single string."""
217 return self.readbuf.readPendingChars(max)
218 def readPendingErrChars(self):
219 """Read all currently pending subprocess error output as a single
221 if self.control_stderr:
222 return self.errbuf.readPendingChars()
224 raise SubprocessError("Haven't grabbed subprocess error stream.")
226 def readPendingLine(self):
227 """Read currently pending subprocess output, up to a complete line
228 (newline inclusive)."""
229 return self.readbuf.readPendingLine()
230 def readPendingErrLine(self):
231 """Read currently pending subprocess error output, up to a complete
232 line (newline inclusive)."""
233 if self.control_stderr:
234 return self.errbuf.readPendingLine()
236 raise SubprocessError("Haven't grabbed subprocess error stream.")
239 """Return next complete line of subprocess output, blocking until
241 return self.readbuf.readline()
242 def readlineErr(self):
243 """Return next complete line of subprocess error output, blocking until
245 if self.control_stderr:
246 return self.errbuf.readline()
248 raise SubprocessError("Haven't grabbed subprocess error stream.")
250 ### Subprocess Control ###
253 """True if subprocess is alive and kicking."""
254 return self.status(boolean=1)
255 def status(self, boolean=0):
256 """Return string indicating whether process is alive or dead."""
259 status = 'sans command'
261 status = 'sans process'
262 elif not self.cont():
263 status = "(unresponding) '%s'" % self.cmd
265 status = "'%s'" % self.cmd
272 def stop(self, verbose=1):
273 """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
276 os.kill(self.pid, signal.SIGSTOP)
279 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
281 if verbose: print("Stopped '%s'" % self.cmd)
284 def cont(self, verbose=0):
285 """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
288 os.kill(self.pid, signal.SIGCONT)
291 print(("Continue failed for '%s' - '%s'" %
292 (self.cmd, sys.exc_value)))
294 if verbose: print("Continued '%s'" % self.cmd)
298 """Send process PID signal SIG (default 9, 'kill'), returning None once
299 it is successfully reaped.
301 SubprocessError is raised if process is not successfully killed."""
304 raise SubprocessError("No process") # ===>
305 elif not self.cont():
306 raise SubprocessError("Can't signal subproc %s" % self) # ===>
308 # Try sending first a TERM and then a KILL signal.
310 sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
313 os.kill(self.pid, sig[1])
317 # Try a couple or three times to reap the process with waitpid:
319 # WNOHANG == 1 on sunos, presumably same elsewhere.
320 if os.waitpid(self.pid, os.WNOHANG):
321 if self.expire_noisily:
322 print(("\n(%s subproc %d '%s' / %s)" %
323 (sig[0], self.pid, self.cmd,
325 for i in self.pipefiles:
330 # Only got here if subprocess is not gone:
331 raise SubprocessError(
332 "Failed kill of subproc %d, '%s', with signals %s" %
333 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
336 """Terminate the subprocess"""
341 status = self.status()
342 return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
344 #############################################################################
345 ##### Non-blocking read operations #####
346 #############################################################################
349 """Output buffer for non-blocking reads on selectable files like pipes and
350 sockets. Init with a file descriptor for the file."""
352 def __init__(self, fd, maxChunkSize=1024):
353 """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
359 self.eof = 0 # May be set with stuff still in .buf
361 self.chunkSize = maxChunkSize # Biggest read chunk, default 1024.
366 def peekPendingChar(self):
367 """Return, but don't consume, first character of unconsumed output from
368 file, or empty string if none."""
371 return self.buf[0] # ===>
376 sel = select.select([self.fd], [], [self.fd], 0)
380 self.buf = os.read(self.fd, self.chunkSize) # ===>
381 return self.buf[0] # Assume select don't lie.
382 else: return '' # ===>
385 def readPendingChar(self):
386 """Consume first character of unconsumed output from file, or empty
390 got, self.buf = self.buf[0], self.buf[1:]
396 sel = select.select([self.fd], [], [self.fd], 0)
400 return os.read(self.fd, 1) # ===>
401 else: return '' # ===>
403 def readPendingChars(self, max=None):
404 """Consume uncomsumed output from FILE, or empty string if nothing
409 if (max > 0) and (len(self.buf) > max):
410 got = self.buf[0:max]
411 self.buf = self.buf[max:]
413 got, self.buf = self.buf, ''
419 sel = select.select([self.fd], [], [self.fd], 0)
423 got = got + os.read(self.fd, self.chunkSize)
430 self.buf = self.buf + got[max:]
436 def readPendingLine(self, block=0):
437 """Return pending output from FILE, up to first newline (inclusive).
439 Does not block unless optional arg BLOCK is true.
441 Note that an error will be raised if a new eof is encountered without
445 to = string.find(self.buf, '\n')
447 got, self.buf = self.buf[:to+1], self.buf[to+1:]
449 got, self.buf = self.buf, ''
455 # Herein, 'got' contains the (former) contents of the buffer, and it
456 # doesn't include a newline.
458 period = block and 1.0 or 0 # don't be too busy while waiting
459 while 1: # (we'll only loop if block set)
460 sel = select.select(fdlist, [], fdlist, period)
464 got = got + os.read(self.fd, self.chunkSize)
466 to = string.find(got, '\n')
468 got, self.buf = got[:to+1], got[to+1:]
473 self.buf = '' # this is how an ordinary file acts...
475 # otherwise - no newline, blocking requested, no eof - loop. # ==^
478 """Return next output line from file, blocking until it is received."""
480 return self.readPendingLine(1) # ===>
483 #############################################################################
484 ##### Encapsulated reading and writing #####
485 #############################################################################
486 # Encapsulate messages so the end can be unambiguously identified, even
487 # when they contain multiple, possibly empty lines.
490 """Encapsulate stream object for record-oriented IO.
492 Particularly useful when dealing with non-line oriented communications
493 over pipes, eg with subprocesses."""
495 # Message is written preceded by a line containing the message length.
497 def __init__(self, f):
500 def write_record(self, s):
501 "Write so self.read knows exactly how much to read."
502 f = self.__dict__['file']
503 f.write("%s\n%s" % (len(s), s))
504 if hasattr(f, 'flush'):
507 def read_record(self):
508 "Read and reconstruct message as prepared by self.write."
509 f = self.__dict__['file']
510 line = f.readline()[:-1]
513 l = string.atoi(line)
515 raise IOError(("corrupt %s file structure"
516 % self.__class__.__name__))
522 def __getattr__(self, attr):
523 """Implement characteristic file object attributes."""
524 f = self.__dict__['file']
526 return getattr(f, attr)
528 raise AttributeError(attr)
531 return "<%s of %s at %s>" % (self.__class__.__name__,
532 self.__dict__['file'],
536 """Exercise encapsulated write/read with an arbitrary string.
538 Raise IOError if the string gets distorted through transmission!"""
539 from StringIO import StringIO
545 show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
547 raise IOError("String distorted:\n%s" % show)
549 #############################################################################
550 ##### An example subprocess interfaces #####
551 #############################################################################
554 """Convenient interface to CCSO 'ph' nameserver subprocess.
556 .query('string...') method takes a query and returns a list of dicts, each
557 of which represents one entry."""
559 # Note that i made this a class that handles a subprocess object, rather
560 # than one that inherits from it. I didn't see any functional
561 # disadvantages, and didn't think that full support of the entire
562 # Subprocess functionality was in any way suitable for interaction with
563 # this specialized interface. ? klm 13-Jan-1995
567 self.proc = Subprocess('ph', 1)
569 raise SubprocessError('failure starting ph: %s' % # ===>
573 """Send a query and return a list of dicts for responses.
575 Raise a ValueError if ph responds with an error."""
579 self.proc.writeline('query ' + q)
582 response = self.getreply() # Should get null on new prompt.
583 errs = self.proc.readPendingErrChars()
585 sys.stderr.write(errs)
591 elif type(response) == types.StringType:
592 raise ValueError("ph failed match: '%s'" % response) # ===>
593 for line in response:
595 line = string.splitfields(line, ':')
596 it[string.strip(line[0])] = (
597 string.strip(string.join(line[1:])))
600 """Consume next response from ph, returning list of lines or string
602 # Key on first char: (First line may lack newline.)
603 # - dash discard line
604 # - 'ph> ' conclusion of response
605 # - number error message
606 # - whitespace beginning of next response
608 nextChar = self.proc.waitForPendingChar(60)
610 raise SubprocessError('ph subprocess not responding') # ===>
611 elif nextChar == '-':
612 # dashed line - discard it, and continue reading:
614 return self.getreply() # ===>
615 elif nextChar == 'p':
616 # 'ph> ' prompt - don't think we should hit this, but what the hay:
618 elif nextChar in '0123456789':
619 # Error notice - we're currently assuming single line errors:
620 return self.proc.readline()[:-1] # ===>
621 elif nextChar in ' \t':
622 # Get content, up to next dashed line:
624 while nextChar != '-' and nextChar != '':
625 got.append(self.proc.readline()[:-1])
626 nextChar = self.proc.peekPendingChar()
629 return "<Ph instance, %s at %s>\n" % (self.proc.status(),
632 """Clear-out initial preface or residual subproc input and output."""
633 pause = .5; maxIter = 10 # 5 seconds to clear
637 while iterations < maxIter:
638 got = got + self.proc.readPendingChars()
639 # Strip out all but the last incomplete line:
640 got = string.splitfields(got, '\n')[-1]
641 if got == 'ph> ': return # Ok. ===>
643 raise SubprocessError('ph not responding within %s secs' %
646 #############################################################################
648 #############################################################################
651 print("\tOpening subprocess:")
652 p = Subprocess('cat', 1) # set to expire noisily...
654 print("\tOpening bogus subprocess, should fail:")
656 b = Subprocess('/', 1)
657 print("\tOops! Null-named subprocess startup *succeeded*?!?")
658 except SubprocessError:
659 print("\t...yep, it failed.")
660 print('\tWrite, then read, two newline-teriminated lines, using readline:')
661 p.write('first full line written\n'); p.write('second.\n')
662 print(repr(p.readline()))
663 print(repr(p.readline()))
664 print('\tThree lines, last sans newline, read using combination:')
665 p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
666 print('\tFirst line via readline:')
667 print(repr(p.readline()))
668 print('\tRest via readPendingChars:')
669 print(p.readPendingChars())
670 print("\tStopping then continuing subprocess (verbose):")
671 if not p.stop(1): # verbose stop
672 print('\t** Stop seems to have failed!')
674 print('\tWriting line while subprocess is paused...')
675 p.write('written while subprocess paused\n')
676 print('\tNonblocking read of paused subprocess (should be empty):')
677 print(p.readPendingChars())
678 print('\tContinuing subprocess (verbose):')
679 if not p.cont(1): # verbose continue
680 print('\t** Continue seems to have failed! Probly lost subproc...')
683 print('\tReading accumulated line, blocking read:')
685 print("\tDeleting subproc, which was set to die noisily:")