3 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
5 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
8 Subprocess class features:
10 - provides non-blocking stdin and stderr reads
12 - provides subprocess stop and continue, kill-on-deletion
14 - provides detection of subprocess startup failure
16 - Subprocess objects have nice, informative string rep (as every good object
19 __version__ = "Revision: 1.15 "
21 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
22 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
24 # Prior art: Initially based python code examples demonstrating usage of pipes
25 # and subprocesses, primarily one by jose pereira.
27 # Implementation notes:
28 # - I'm not using the fcntl module to implement non-blocking file descriptors,
29 # because i don't know what all in it is portable and what is not. I'm not
30 # about to provide for different platform contingencies - at that extent, the
31 # effort would be better spent hacking 'expect' into python.
32 # - Todo? - Incorporate an error-output handler approach, where error output is
33 # checked on regular IO, when a handler is defined, and passed to the
34 # handler (eg for printing) immediately as it shows...
35 # - Detection of failed subprocess startup is a gross kludge, at present.
37 # - new additions (1.3, 1.4):
38 # - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
39 # reads based solely on os.read with select, while capitalizing big-time on
40 # multi-char read chunking.
41 # - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
43 # ken.manheimer@nist.gov
45 # This is a modified version by Oleg Broytman <phd@phdru.name>.
46 # The original version is still preserved at
47 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
49 import sys, os, string, time, types
54 class SubprocessError(Exception):
57 # You may need to increase execvp_grace_seconds, if you have a large or slow
59 execvp_grace_seconds = 0.5
62 """Run and communicate asynchronously with a subprocess.
64 Provides non-blocking reads in the form of .readPendingChars and
67 .readline will block until it gets a complete line.
69 .peekPendingChar does a non-blocking, non-consuming read for pending
70 output, and can be used before .readline to check non-destructively for
71 pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until
72 a new character is pending, or timeout secs pass, with granularity of
75 There are corresponding read and peekPendingErrXXX routines, to read from
76 the subprocess stderr stream."""
80 expire_noisily = 1 # Announce subproc destruction?
82 readbuf = 0 # fork will assign to be a readbuf obj
83 errbuf = 0 # fork will assign to be a readbuf obj
85 def __init__(self, cmd, control_stderr=0, expire_noisily=0,
86 in_fd=0, out_fd=1, err_fd=2):
87 """Launch a subprocess, given command string COMMAND."""
90 self.expire_noisily = expire_noisily
91 self.control_stderr = control_stderr
92 self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
95 def fork(self, cmd=None):
96 """Fork a subprocess with designated COMMAND (default, self.cmd)."""
97 if cmd: self.cmd = cmd
99 cmd = string.split(self.cmd)
100 pRc, cWp = os.pipe() # parent-read-child, child-write-parent
101 cRp, pWc = os.pipe() # child-read-parent, parent-write-child
102 pRe, cWe = os.pipe() # parent-read-error, child-write-error
103 self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
107 if self.pid == 0: #### CHILD ####
108 parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
109 # Reopen stdin, out, err, on pipe ends:
110 os.dup2(cRp, self.in_fd) # cRp = sys.stdin
111 os.dup2(cWp, self.out_fd) # cWp = sys.stdout
112 if self.control_stderr:
113 os.dup2(cWe, self.err_fd) # cWe = sys.stderr
114 # Ensure (within reason) stray file descriptors are closed:
115 excludes = [self.in_fd, self.out_fd, self.err_fd]
116 for i in range(4,100):
117 if i not in excludes:
119 except os.error: pass
122 os.execvp(cmd[0], cmd)
123 os._exit(1) # Shouldn't get here
125 except os.error as e:
126 if self.control_stderr:
127 os.dup2(parentErr, 2) # Reconnect to parent's stdout
128 sys.stderr.write("**execvp failed, '%s'**\n" %
131 os._exit(1) # Shouldn't get here.
134 # Connect to the child's file descriptors, using our customized
136 self.toChild = os.fdopen(pWc, 'w')
137 self.toChild_fdlist = [pWc]
138 self.readbuf = ReadBuf(pRc)
139 self.errbuf = ReadBuf(pRe)
140 time.sleep(execvp_grace_seconds)
142 pid, err = os.waitpid(self.pid, os.WNOHANG)
143 except os.error as error:
147 raise SubprocessError("Subprocess '%s' failed." % self.cmd)
149 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
151 # child exited already
154 rc = (err & 0xff00) >> 8
156 raise SubprocessError(
157 "child killed by signal %d with a return code of %d"
161 raise SubprocessError(
162 "child exited with return code %d" % rc)
163 # Child may have exited, but not in error, so we won't say
164 # anything more at this point.
166 ### Write input to subprocess ###
168 def write(self, str):
169 """Write a STRING to the subprocess."""
172 raise SubprocessError("no child") # ===>
173 if select.select([],self.toChild_fdlist,[],0)[1]:
174 self.toChild.write(str)
177 # XXX Can write-buffer full be handled better??
178 raise IOError("write to %s blocked" % self) # ===>
180 def writeline(self, line=''):
181 """Write STRING, with added newline termination, to subprocess."""
182 self.write(line + '\n')
184 ### Get output from subprocess ###
186 def peekPendingChar(self):
187 """Return, but (effectively) do not consume a single pending output
188 char, or return null string if none pending."""
190 return self.readbuf.peekPendingChar() # ===>
191 def peekPendingErrChar(self):
192 """Return, but (effectively) do not consume a single pending output
193 char, or return null string if none pending."""
195 return self.errbuf.peekPendingChar() # ===>
197 def waitForPendingChar(self, timeout, pollPause=.1):
198 """Block max TIMEOUT secs until we peek a pending char, returning the
199 char, or '' if none encountered.
201 Pause POLLPAUSE secs (default .1) between polls."""
205 nextChar = self.readbuf.peekPendingChar()
206 if nextChar or (accume > timeout): return nextChar
207 time.sleep(pollPause)
208 accume = accume + pollPause
210 def read(self, n=None):
211 """Read N chars, or all pending if no N specified."""
213 return self.readPendingChars()
216 got0 = self.readPendingChars(n)
220 def readPendingChars(self, max=None):
221 """Read all currently pending subprocess output as a single string."""
222 return self.readbuf.readPendingChars(max)
223 def readPendingErrChars(self):
224 """Read all currently pending subprocess error output as a single
226 if self.control_stderr:
227 return self.errbuf.readPendingChars()
229 raise SubprocessError("Haven't grabbed subprocess error stream.")
231 def readPendingLine(self):
232 """Read currently pending subprocess output, up to a complete line
233 (newline inclusive)."""
234 return self.readbuf.readPendingLine()
235 def readPendingErrLine(self):
236 """Read currently pending subprocess error output, up to a complete
237 line (newline inclusive)."""
238 if self.control_stderr:
239 return self.errbuf.readPendingLine()
241 raise SubprocessError("Haven't grabbed subprocess error stream.")
244 """Return next complete line of subprocess output, blocking until
246 return self.readbuf.readline()
247 def readlineErr(self):
248 """Return next complete line of subprocess error output, blocking until
250 if self.control_stderr:
251 return self.errbuf.readline()
253 raise SubprocessError("Haven't grabbed subprocess error stream.")
255 ### Subprocess Control ###
258 """True if subprocess is alive and kicking."""
259 return self.status(boolean=1)
260 def status(self, boolean=0):
261 """Return string indicating whether process is alive or dead."""
264 status = 'sans command'
266 status = 'sans process'
267 elif not self.cont():
268 status = "(unresponding) '%s'" % self.cmd
270 status = "'%s'" % self.cmd
277 def stop(self, verbose=1):
278 """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
281 os.kill(self.pid, signal.SIGSTOP)
284 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
286 if verbose: print("Stopped '%s'" % self.cmd)
289 def cont(self, verbose=0):
290 """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
293 os.kill(self.pid, signal.SIGCONT)
296 print(("Continue failed for '%s' - '%s'" %
297 (self.cmd, sys.exc_value)))
299 if verbose: print("Continued '%s'" % self.cmd)
303 """Send process PID signal SIG (default 9, 'kill'), returning None once
304 it is successfully reaped.
306 SubprocessError is raised if process is not successfully killed."""
309 raise SubprocessError("No process") # ===>
310 elif not self.cont():
311 raise SubprocessError("Can't signal subproc %s" % self) # ===>
313 # Try sending first a TERM and then a KILL signal.
315 sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
318 os.kill(self.pid, sig[1])
322 # Try a couple or three times to reap the process with waitpid:
324 # WNOHANG == 1 on sunos, presumably same elsewhere.
325 if os.waitpid(self.pid, os.WNOHANG):
326 if self.expire_noisily:
327 print(("\n(%s subproc %d '%s' / %s)" %
328 (sig[0], self.pid, self.cmd,
330 for i in self.pipefiles:
332 fp = os.fdopen(i).close()
335 del self.pipefiles[:]
339 # Only got here if subprocess is not gone:
340 raise SubprocessError(
341 "Failed kill of subproc %d, '%s', with signals %s" %
342 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
345 """Terminate the subprocess"""
350 status = self.status()
351 return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
353 #############################################################################
354 ##### Non-blocking read operations #####
355 #############################################################################
358 """Output buffer for non-blocking reads on selectable files like pipes and
359 sockets. Init with a file descriptor for the file."""
361 def __init__(self, fd, maxChunkSize=1024):
362 """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
368 self.eof = 0 # May be set with stuff still in .buf
370 self.chunkSize = maxChunkSize # Biggest read chunk, default 1024.
375 def peekPendingChar(self):
376 """Return, but don't consume, first character of unconsumed output from
377 file, or empty string if none."""
380 return self.buf[0] # ===>
385 sel = select.select([self.fd], [], [self.fd], 0)
389 self.buf = os.read(self.fd, self.chunkSize) # ===>
390 return self.buf[0] # Assume select don't lie.
391 else: return '' # ===>
394 def readPendingChar(self):
395 """Consume first character of unconsumed output from file, or empty
399 got, self.buf = self.buf[0], self.buf[1:]
405 sel = select.select([self.fd], [], [self.fd], 0)
409 return os.read(self.fd, 1) # ===>
410 else: return '' # ===>
412 def readPendingChars(self, max=None):
413 """Consume uncomsumed output from FILE, or empty string if nothing
418 if (max > 0) and (len(self.buf) > max):
419 got = self.buf[0:max]
420 self.buf = self.buf[max:]
422 got, self.buf = self.buf, ''
428 sel = select.select([self.fd], [], [self.fd], 0)
432 got = got + os.read(self.fd, self.chunkSize)
439 self.buf = self.buf + got[max:]
445 def readPendingLine(self, block=0):
446 """Return pending output from FILE, up to first newline (inclusive).
448 Does not block unless optional arg BLOCK is true.
450 Note that an error will be raised if a new eof is encountered without
454 to = string.find(self.buf, '\n')
456 got, self.buf = self.buf[:to+1], self.buf[to+1:]
458 got, self.buf = self.buf, ''
464 # Herein, 'got' contains the (former) contents of the buffer, and it
465 # doesn't include a newline.
467 period = block and 1.0 or 0 # don't be too busy while waiting
468 while 1: # (we'll only loop if block set)
469 sel = select.select(fdlist, [], fdlist, period)
473 got = got + os.read(self.fd, self.chunkSize)
475 to = string.find(got, '\n')
477 got, self.buf = got[:to+1], got[to+1:]
482 self.buf = '' # this is how an ordinary file acts...
484 # otherwise - no newline, blocking requested, no eof - loop. # ==^
487 """Return next output line from file, blocking until it is received."""
489 return self.readPendingLine(1) # ===>
492 #############################################################################
493 ##### Encapsulated reading and writing #####
494 #############################################################################
495 # Encapsulate messages so the end can be unambiguously identified, even
496 # when they contain multiple, possibly empty lines.
499 """Encapsulate stream object for record-oriented IO.
501 Particularly useful when dealing with non-line oriented communications
502 over pipes, eg with subprocesses."""
504 # Message is written preceded by a line containing the message length.
506 def __init__(self, f):
509 def write_record(self, s):
510 "Write so self.read knows exactly how much to read."
511 f = self.__dict__['file']
512 f.write("%s\n%s" % (len(s), s))
513 if hasattr(f, 'flush'):
516 def read_record(self):
517 "Read and reconstruct message as prepared by self.write."
518 f = self.__dict__['file']
519 line = f.readline()[:-1]
522 l = string.atoi(line)
524 raise IOError(("corrupt %s file structure"
525 % self.__class__.__name__))
531 def __getattr__(self, attr):
532 """Implement characteristic file object attributes."""
533 f = self.__dict__['file']
535 return getattr(f, attr)
537 raise AttributeError(attr)
540 return "<%s of %s at %s>" % (self.__class__.__name__,
541 self.__dict__['file'],
545 """Exercise encapsulated write/read with an arbitrary string.
547 Raise IOError if the string gets distorted through transmission!"""
548 from StringIO import StringIO
554 show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
556 raise IOError("String distorted:\n%s" % show)
558 #############################################################################
559 ##### An example subprocess interfaces #####
560 #############################################################################
563 """Convenient interface to CCSO 'ph' nameserver subprocess.
565 .query('string...') method takes a query and returns a list of dicts, each
566 of which represents one entry."""
568 # Note that i made this a class that handles a subprocess object, rather
569 # than one that inherits from it. I didn't see any functional
570 # disadvantages, and didn't think that full support of the entire
571 # Subprocess functionality was in any way suitable for interaction with
572 # this specialized interface. ? klm 13-Jan-1995
576 self.proc = Subprocess('ph', 1)
578 raise SubprocessError('failure starting ph: %s' % # ===>
582 """Send a query and return a list of dicts for responses.
584 Raise a ValueError if ph responds with an error."""
588 self.proc.writeline('query ' + q)
591 response = self.getreply() # Should get null on new prompt.
592 errs = self.proc.readPendingErrChars()
594 sys.stderr.write(errs)
600 elif type(response) == types.StringType:
601 raise ValueError("ph failed match: '%s'" % response) # ===>
602 for line in response:
604 line = string.splitfields(line, ':')
605 it[string.strip(line[0])] = (
606 string.strip(string.join(line[1:])))
609 """Consume next response from ph, returning list of lines or string
611 # Key on first char: (First line may lack newline.)
612 # - dash discard line
613 # - 'ph> ' conclusion of response
614 # - number error message
615 # - whitespace beginning of next response
617 nextChar = self.proc.waitForPendingChar(60)
619 raise SubprocessError('ph subprocess not responding') # ===>
620 elif nextChar == '-':
621 # dashed line - discard it, and continue reading:
623 return self.getreply() # ===>
624 elif nextChar == 'p':
625 # 'ph> ' prompt - don't think we should hit this, but what the hay:
627 elif nextChar in '0123456789':
628 # Error notice - we're currently assuming single line errors:
629 return self.proc.readline()[:-1] # ===>
630 elif nextChar in ' \t':
631 # Get content, up to next dashed line:
633 while nextChar != '-' and nextChar != '':
634 got.append(self.proc.readline()[:-1])
635 nextChar = self.proc.peekPendingChar()
638 return "<Ph instance, %s at %s>\n" % (self.proc.status(),
641 """Clear-out initial preface or residual subproc input and output."""
642 pause = .5; maxIter = 10 # 5 seconds to clear
646 while iterations < maxIter:
647 got = got + self.proc.readPendingChars()
648 # Strip out all but the last incomplete line:
649 got = string.splitfields(got, '\n')[-1]
650 if got == 'ph> ': return # Ok. ===>
652 raise SubprocessError('ph not responding within %s secs' %
655 #############################################################################
657 #############################################################################
660 print("\tOpening bogus subprocess, should fail:")
662 b = Subprocess('/', 1)
663 print("\tOops! Null-named subprocess startup *succeeded*?!?")
664 except SubprocessError:
665 print("\t...yep, it failed.")
666 print("\tOpening cat subprocess:")
667 p = Subprocess('cat', 1) # set to expire noisily...
669 print('\tWrite, then read, two newline-teriminated lines, using readline:')
670 p.write('first full line written\n'); p.write('second.\n')
671 print(repr(p.readline()))
672 print(repr(p.readline()))
673 print('\tThree lines, last sans newline, read using combination:')
674 p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
675 print('\tFirst line via readline:')
676 print(repr(p.readline()))
677 print('\tRest via readPendingChars:')
678 print(p.readPendingChars())
679 print("\tStopping then continuing subprocess (verbose):")
680 if not p.stop(1): # verbose stop
681 print('\t** Stop seems to have failed!')
683 print('\tWriting line while subprocess is paused...')
684 p.write('written while subprocess paused\n')
685 print('\tNonblocking read of paused subprocess (should be empty):')
686 print(p.readPendingChars())
687 print('\tContinuing subprocess (verbose):')
688 if not p.cont(1): # verbose continue
689 print('\t** Continue seems to have failed! Probly lost subproc...')
692 print('\tReading accumulated line, blocking read:')
694 print("\tDeleting subproc, which was set to die noisily:")
699 if __name__ == '__main__':