3 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
5 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
8 Subprocess class features:
10 - provides non-blocking stdin and stderr reads
12 - provides subprocess stop and continue, kill-on-deletion
14 - provides detection of subprocess startup failure
16 - Subprocess objects have nice, informative string rep (as every good object
19 __version__ = "Revision: 2.0 "
21 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
22 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
24 # Prior art: Initially based python code examples demonstrating usage of pipes
25 # and subprocesses, primarily one by jose pereira.
27 # Implementation notes:
28 # - I'm not using the fcntl module to implement non-blocking file descriptors,
29 # because i don't know what all in it is portable and what is not. I'm not
30 # about to provide for different platform contingencies - at that extent, the
31 # effort would be better spent hacking 'expect' into python.
32 # - Todo? - Incorporate an error-output handler approach, where error output is
33 # checked on regular IO, when a handler is defined, and passed to the
34 # handler (eg for printing) immediately as it shows...
35 # - Detection of failed subprocess startup is a gross kludge, at present.
37 # - new additions (1.3, 1.4):
38 # - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
39 # reads based solely on os.read with select, while capitalizing big-time on
40 # multi-char read chunking.
41 # - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
43 # ken.manheimer@nist.gov
45 # This is a modified version by Oleg Broytman <phd@phdru.name>.
46 # The original version is still preserved at
47 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
57 class SubprocessError(Exception):
61 # You may need to increase execvp_grace_seconds, if you have a large or slow
63 execvp_grace_seconds = 0.5
67 """Run and communicate asynchronously with a subprocess.
69 Provides non-blocking reads in the form of .readPendingChars and
72 .readline will block until it gets a complete line.
74 .peekPendingChar does a non-blocking, non-consuming read for pending
75 output, and can be used before .readline to check non-destructively for
76 pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until
77 a new character is pending, or timeout secs pass, with granularity of
80 There are corresponding read and peekPendingErrXXX routines, to read from
81 the subprocess stderr stream."""
85 expire_noisily = 1 # Announce subproc destruction?
87 readbuf = 0 # fork will assign to be a readbuf obj
88 errbuf = 0 # fork will assign to be a readbuf obj
90 def __init__(self, cmd, control_stderr=0, expire_noisily=0,
91 in_fd=0, out_fd=1, err_fd=2):
92 """Launch a subprocess, given command string COMMAND."""
95 self.expire_noisily = expire_noisily
96 self.control_stderr = control_stderr
97 self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
100 def fork(self, cmd=None):
101 """Fork a subprocess with designated COMMAND (default, self.cmd)."""
104 cmd = string.split(self.cmd)
105 pRc, cWp = os.pipe() # parent-read-child, child-write-parent
106 cRp, pWc = os.pipe() # child-read-parent, parent-write-child
107 pRe, cWe = os.pipe() # parent-read-error, child-write-error
108 self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
112 if self.pid == 0: #### CHILD #### noqa: E262
113 # Preserve handle on *parent* stderr
114 parentErr = os.dup(self.in_fd)
115 # Reopen stdin, out, err, on pipe ends:
116 os.dup2(cRp, self.in_fd) # cRp = sys.stdin
117 os.dup2(cWp, self.out_fd) # cWp = sys.stdout
118 if self.control_stderr:
119 os.dup2(cWe, self.err_fd) # cWe = sys.stderr
120 # Ensure (within reason) stray file descriptors are closed:
121 excludes = [self.in_fd, self.out_fd, self.err_fd]
122 for i in range(4, 100):
123 if i not in excludes:
130 os.execvp(cmd[0], cmd)
131 os._exit(1) # Shouldn't get here
133 except os.error as e:
134 if self.control_stderr:
135 os.dup2(parentErr, 2) # Reconnect to parent's stdout
136 sys.stderr.write("**execvp failed, '%s'**\n" %
139 os._exit(1) # Shouldn't get here.
141 else: ### PARENT ### noqa: E262
142 # Connect to the child's file descriptors, using our customized
144 self.toChild = os.fdopen(pWc, 'w')
145 self.toChild_fdlist = [pWc]
146 self.readbuf = ReadBuf(pRc)
147 self.errbuf = ReadBuf(pRe)
148 time.sleep(execvp_grace_seconds)
150 pid, err = os.waitpid(self.pid, os.WNOHANG)
151 except os.error as error:
155 raise SubprocessError("Subprocess '%s' failed." % self.cmd)
157 raise SubprocessError(
158 "Subprocess failed[%d]: %s" % (errno, msg))
160 # child exited already
163 rc = (err & 0xff00) >> 8
165 raise SubprocessError(
166 "child killed by signal %d with a return code of %d"
169 raise SubprocessError(
170 "child exited with return code %d" % rc)
171 # Child may have exited, but not in error, so we won't say
172 # anything more at this point.
174 ### Write input to subprocess ### noqa: E266
176 def write(self, str):
177 """Write a STRING to the subprocess."""
180 raise SubprocessError("no child") # ===>
181 if select.select([], self.toChild_fdlist, [], 0)[1]:
182 self.toChild.write(str)
185 # XXX Can write-buffer full be handled better??
186 raise IOError("write to %s blocked" % self) # ===>
188 def writeline(self, line=''):
189 """Write STRING, with added newline termination, to subprocess."""
190 self.write(line + '\n')
192 ### Get output from subprocess ### noqa: E266
194 def peekPendingChar(self):
195 """Return, but (effectively) do not consume a single pending output
196 char, or return null string if none pending."""
198 return self.readbuf.peekPendingChar() # ===>
200 def peekPendingErrChar(self):
201 """Return, but (effectively) do not consume a single pending output
202 char, or return null string if none pending."""
204 return self.errbuf.peekPendingChar() # ===>
206 def waitForPendingChar(self, timeout, pollPause=.1):
207 """Block max TIMEOUT secs until we peek a pending char, returning the
208 char, or '' if none encountered.
210 Pause POLLPAUSE secs (default .1) between polls."""
214 nextChar = self.readbuf.peekPendingChar()
215 if nextChar or (accume > timeout):
217 time.sleep(pollPause)
218 accume = accume + pollPause
220 def read(self, n=None):
221 """Read N chars, or all pending if no N specified."""
223 return self.readPendingChars()
226 got0 = self.readPendingChars(n)
231 def readPendingChars(self, max=None):
232 """Read all currently pending subprocess output as a single string."""
233 return self.readbuf.readPendingChars(max)
235 def readPendingErrChars(self):
236 """Read all currently pending subprocess error output as a single
238 if self.control_stderr:
239 return self.errbuf.readPendingChars()
241 raise SubprocessError("Haven't grabbed subprocess error stream.")
243 def readPendingLine(self):
244 """Read currently pending subprocess output, up to a complete line
245 (newline inclusive)."""
246 return self.readbuf.readPendingLine()
248 def readPendingErrLine(self):
249 """Read currently pending subprocess error output, up to a complete
250 line (newline inclusive)."""
251 if self.control_stderr:
252 return self.errbuf.readPendingLine()
254 raise SubprocessError("Haven't grabbed subprocess error stream.")
257 """Return next complete line of subprocess output, blocking until
259 return self.readbuf.readline()
261 def readlineErr(self):
262 """Return next complete line of subprocess error output, blocking until
264 if self.control_stderr:
265 return self.errbuf.readline()
267 raise SubprocessError("Haven't grabbed subprocess error stream.")
269 ### Subprocess Control ### noqa: E266
272 """True if subprocess is alive and kicking."""
273 return self.status(boolean=1)
275 def status(self, boolean=0):
276 """Return string indicating whether process is alive or dead."""
279 status = 'sans command'
281 status = 'sans process'
282 elif not self.cont():
283 status = "(unresponding) '%s'" % self.cmd
285 status = "'%s'" % self.cmd
292 def stop(self, verbose=1):
293 """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
296 os.kill(self.pid, signal.SIGSTOP)
300 "Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
303 print("Stopped '%s'" % self.cmd)
306 def cont(self, verbose=0):
307 """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
310 os.kill(self.pid, signal.SIGCONT)
313 print(("Continue failed for '%s' - '%s'" %
314 (self.cmd, sys.exc_value)))
317 print("Continued '%s'" % self.cmd)
321 """Send process PID signal SIG (default 9, 'kill'), returning None once
322 it is successfully reaped.
324 SubprocessError is raised if process is not successfully killed."""
327 raise SubprocessError("No process") # ===>
328 elif not self.cont():
329 raise SubprocessError("Can't signal subproc %s" % self) # ===>
331 # Try sending first a TERM and then a KILL signal.
332 sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
335 os.kill(self.pid, sig[1])
339 # Try a couple or three times to reap the process with waitpid:
341 # WNOHANG == 1 on sunos, presumably same elsewhere.
342 if os.waitpid(self.pid, os.WNOHANG):
343 if self.expire_noisily:
344 print(("\n(%s subproc %d '%s' / %s)" %
345 (sig[0], self.pid, self.cmd,
347 for i in self.pipefiles:
352 del self.pipefiles[:]
356 # Only got here if subprocess is not gone:
357 raise SubprocessError(
358 "Failed kill of subproc %d, '%s', with signals %s" %
359 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
362 """Terminate the subprocess"""
367 status = self.status()
368 return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
371 ##############################################################################
372 ##### Non-blocking read operations ##### noqa: E266
373 ##############################################################################
377 """Output buffer for non-blocking reads on selectable files like pipes and
378 sockets. Init with a file descriptor for the file."""
380 def __init__(self, fd, maxChunkSize=1024):
381 """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
387 self.eof = 0 # May be set with stuff still in .buf
389 self.chunkSize = maxChunkSize # Biggest read chunk, default 1024.
394 def peekPendingChar(self):
395 """Return, but don't consume, first character of unconsumed output from
396 file, or empty string if none."""
399 return self.buf[0] # ===>
404 sel = select.select([self.fd], [], [self.fd], 0)
408 self.buf = os.read(self.fd, self.chunkSize) # ===>
409 return self.buf[0] # Assume select don't lie.
413 def readPendingChar(self):
414 """Consume first character of unconsumed output from file, or empty
418 got, self.buf = self.buf[0], self.buf[1:]
424 sel = select.select([self.fd], [], [self.fd], 0)
428 return os.read(self.fd, 1) # ===>
432 def readPendingChars(self, max=None):
433 """Consume uncomsumed output from FILE, or empty string if nothing
438 if (max > 0) and (len(self.buf) > max):
439 got = self.buf[0:max]
440 self.buf = self.buf[max:]
442 got, self.buf = self.buf, ''
448 sel = select.select([self.fd], [], [self.fd], 0)
452 got = got + os.read(self.fd, self.chunkSize)
459 self.buf = self.buf + got[max:]
466 def readPendingLine(self, block=0):
467 """Return pending output from FILE, up to first newline (inclusive).
469 Does not block unless optional arg BLOCK is true.
471 Note that an error will be raised if a new eof is encountered without
475 to = string.find(self.buf, '\n')
477 got, self.buf = self.buf[:to+1], self.buf[to+1:]
479 got, self.buf = self.buf, ''
485 # Herein, 'got' contains the (former) contents of the buffer, and it
486 # doesn't include a newline.
488 period = block and 1.0 or 0 # don't be too busy while waiting
489 while 1: # (we'll only loop if block set)
490 sel = select.select(fdlist, [], fdlist, period)
494 got = got + os.read(self.fd, self.chunkSize)
496 to = string.find(got, '\n')
498 got, self.buf = got[:to+1], got[to+1:]
503 self.buf = '' # this is how an ordinary file acts...
505 # otherwise - no newline, blocking requested, no eof - loop. # ==^
508 """Return next output line from file, blocking until it is received."""
510 return self.readPendingLine(1) # ===>
513 #############################################################################
514 ##### Encapsulated reading and writing ##### noqa: E266
515 #############################################################################
516 # Encapsulate messages so the end can be unambiguously identified, even
517 # when they contain multiple, possibly empty lines.
521 """Encapsulate stream object for record-oriented IO.
523 Particularly useful when dealing with non-line oriented communications
524 over pipes, eg with subprocesses."""
526 # Message is written preceded by a line containing the message length.
528 def __init__(self, f):
531 def write_record(self, s):
532 "Write so self.read knows exactly how much to read."
533 f = self.__dict__['file']
534 f.write("%s\n%s" % (len(s), s))
535 if hasattr(f, 'flush'):
538 def read_record(self):
539 "Read and reconstruct message as prepared by self.write."
540 f = self.__dict__['file']
541 line = f.readline()[:-1]
544 l = string.atoi(line)
546 raise IOError(("corrupt %s file structure"
547 % self.__class__.__name__))
553 def __getattr__(self, attr):
554 """Implement characteristic file object attributes."""
555 f = self.__dict__['file']
557 return getattr(f, attr)
559 raise AttributeError(attr)
562 return "<%s of %s at %s>" % (self.__class__.__name__,
563 self.__dict__['file'],
568 """Exercise encapsulated write/read with an arbitrary string.
570 Raise IOError if the string gets distorted through transmission!"""
571 from StringIO import StringIO
577 show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
579 raise IOError("String distorted:\n%s" % show)
582 #############################################################################
583 ##### An example subprocess interfaces ##### noqa: E266
584 #############################################################################
588 """Convenient interface to CCSO 'ph' nameserver subprocess.
590 .query('string...') method takes a query and returns a list of dicts, each
591 of which represents one entry."""
593 # Note that i made this a class that handles a subprocess object, rather
594 # than one that inherits from it. I didn't see any functional
595 # disadvantages, and didn't think that full support of the entire
596 # Subprocess functionality was in any way suitable for interaction with
597 # this specialized interface. ? klm 13-Jan-1995
601 self.proc = Subprocess('ph', 1)
603 raise SubprocessError('failure starting ph: %s' % # ===>
607 """Send a query and return a list of dicts for responses.
609 Raise a ValueError if ph responds with an error."""
613 self.proc.writeline('query ' + q)
617 response = self.getreply() # Should get null on new prompt.
618 errs = self.proc.readPendingErrChars()
620 sys.stderr.write(errs)
626 elif isinstance(response, str):
627 raise ValueError("ph failed match: '%s'" % response) # ===>
628 for line in response:
630 line = string.splitfields(line, ':')
631 it[string.strip(line[0])] = (
632 string.strip(string.join(line[1:])))
635 """Consume next response from ph, returning list of lines or string
637 # Key on first char: (First line may lack newline.)
638 # - dash discard line
639 # - 'ph> ' conclusion of response
640 # - number error message
641 # - whitespace beginning of next response
643 nextChar = self.proc.waitForPendingChar(60)
645 raise SubprocessError('ph subprocess not responding') # ===>
646 elif nextChar == '-':
647 # dashed line - discard it, and continue reading:
649 return self.getreply() # ===>
650 elif nextChar == 'p':
651 # 'ph> ' prompt - don't think we should hit this, but what the hay:
653 elif nextChar in '0123456789':
654 # Error notice - we're currently assuming single line errors:
655 return self.proc.readline()[:-1] # ===>
656 elif nextChar in ' \t':
657 # Get content, up to next dashed line:
659 while nextChar != '-' and nextChar != '':
660 got.append(self.proc.readline()[:-1])
661 nextChar = self.proc.peekPendingChar()
665 return "<Ph instance, %s at %s>\n" % (self.proc.status(),
669 """Clear-out initial preface or residual subproc input and output."""
671 maxIter = 10 # 5 seconds to clear
675 while iterations < maxIter:
676 got = got + self.proc.readPendingChars()
677 # Strip out all but the last incomplete line:
678 got = string.splitfields(got, '\n')[-1]
682 raise SubprocessError('ph not responding within %s secs' %
686 ##############################################################################
687 ##### Test ##### noqa: E266
688 ##############################################################################
692 print("\tOpening bogus subprocess, should fail:")
695 print("\tOops! Null-named subprocess startup *succeeded*?!?")
696 except SubprocessError:
697 print("\t...yep, it failed.")
698 print("\tOpening cat subprocess:")
699 p = Subprocess('cat', 1) # set to expire noisily...
701 print('\tWrite, then read, two newline-teriminated lines, using readline:')
702 p.write('first full line written\n')
704 print(repr(p.readline()))
705 print(repr(p.readline()))
706 print('\tThree lines, last sans newline, read using combination:')
709 p.write('third, (no cr)')
710 print('\tFirst line via readline:')
711 print(repr(p.readline()))
712 print('\tRest via readPendingChars:')
713 print(p.readPendingChars())
714 print("\tStopping then continuing subprocess (verbose):")
715 if not p.stop(1): # verbose stop
716 print('\t** Stop seems to have failed!')
718 print('\tWriting line while subprocess is paused...')
719 p.write('written while subprocess paused\n')
720 print('\tNonblocking read of paused subprocess (should be empty):')
721 print(p.readPendingChars())
722 print('\tContinuing subprocess (verbose):')
723 if not p.cont(1): # verbose continue
725 '\t** Continue seems to have failed! Probly lost subproc...')
728 print('\tReading accumulated line, blocking read:')
730 print("\tDeleting subproc, which was set to die noisily:")
736 if __name__ == '__main__':