3 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
5 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
8 Subprocess class features:
10 - provides non-blocking stdin and stderr reads
12 - provides subprocess stop and continue, kill-on-deletion
14 - provides detection of subprocess startup failure
16 - Subprocess objects have nice, informative string rep (as every good object
19 __version__ = "Revision: 2.0 "
21 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
22 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
24 # Prior art: Initially based python code examples demonstrating usage of pipes
25 # and subprocesses, primarily one by jose pereira.
27 # Implementation notes:
28 # - I'm not using the fcntl module to implement non-blocking file descriptors,
29 # because i don't know what all in it is portable and what is not. I'm not
30 # about to provide for different platform contingencies - at that extent, the
31 # effort would be better spent hacking 'expect' into python.
32 # - Todo? - Incorporate an error-output handler approach, where error output is
33 # checked on regular IO, when a handler is defined, and passed to the
34 # handler (eg for printing) immediately as it shows...
35 # - Detection of failed subprocess startup is a gross kludge, at present.
37 # - new additions (1.3, 1.4):
38 # - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
39 # reads based solely on os.read with select, while capitalizing big-time on
40 # multi-char read chunking.
41 # - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
43 # ken.manheimer@nist.gov
45 # This is a modified version by Oleg Broytman <phd@phdru.name>.
46 # The original version is still preserved at
47 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
56 class SubprocessError(Exception):
60 # You may need to increase execvp_grace_seconds, if you have a large or slow
62 execvp_grace_seconds = 0.5
66 """Run and communicate asynchronously with a subprocess.
68 Provides non-blocking reads in the form of .readPendingChars and
71 .readline will block until it gets a complete line.
73 .peekPendingChar does a non-blocking, non-consuming read for pending
74 output, and can be used before .readline to check non-destructively for
75 pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until
76 a new character is pending, or timeout secs pass, with granularity of
79 There are corresponding read and peekPendingErrXXX routines, to read from
80 the subprocess stderr stream."""
84 expire_noisily = 1 # Announce subproc destruction?
86 readbuf = 0 # fork will assign to be a readbuf obj
87 errbuf = 0 # fork will assign to be a readbuf obj
89 def __init__(self, cmd, control_stderr=0, expire_noisily=0,
90 in_fd=0, out_fd=1, err_fd=2):
91 """Launch a subprocess, given command string COMMAND."""
94 self.expire_noisily = expire_noisily
95 self.control_stderr = control_stderr
96 self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
99 def fork(self, cmd=None):
100 """Fork a subprocess with designated COMMAND (default, self.cmd)."""
103 cmd = self.cmd.split()
104 pRc, cWp = os.pipe() # parent-read-child, child-write-parent
105 cRp, pWc = os.pipe() # child-read-parent, parent-write-child
106 pRe, cWe = os.pipe() # parent-read-error, child-write-error
107 self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
111 if self.pid == 0: #### CHILD #### noqa: E262
112 # Preserve handle on *parent* stderr
113 parentErr = os.dup(self.in_fd)
114 # Reopen stdin, out, err, on pipe ends:
115 os.dup2(cRp, self.in_fd) # cRp = sys.stdin
116 os.dup2(cWp, self.out_fd) # cWp = sys.stdout
117 if self.control_stderr:
118 os.dup2(cWe, self.err_fd) # cWe = sys.stderr
119 # Ensure (within reason) stray file descriptors are closed:
120 excludes = [self.in_fd, self.out_fd, self.err_fd]
121 for i in range(4, 100):
122 if i not in excludes:
129 os.execvp(cmd[0], cmd)
130 os._exit(1) # Shouldn't get here
132 except os.error as e:
133 if self.control_stderr:
134 os.dup2(parentErr, 2) # Reconnect to parent's stdout
135 sys.stderr.write("**execvp failed, '%s'**\n" %
138 os._exit(1) # Shouldn't get here.
140 else: ### PARENT ### noqa: E262
141 # Connect to the child's file descriptors, using our customized
143 self.toChild = os.fdopen(pWc, 'w')
144 self.toChild_fdlist = [pWc]
145 self.readbuf = ReadBuf(pRc)
146 self.errbuf = ReadBuf(pRe)
147 time.sleep(execvp_grace_seconds)
149 pid, err = os.waitpid(self.pid, os.WNOHANG)
150 except os.error as error:
154 raise SubprocessError("Subprocess '%s' failed." % self.cmd)
156 raise SubprocessError(
157 "Subprocess failed[%d]: %s" % (errno, msg))
159 # child exited already
162 rc = (err & 0xff00) >> 8
164 raise SubprocessError(
165 "child killed by signal %d with a return code of %d"
168 raise SubprocessError(
169 "child exited with return code %d" % rc)
170 # Child may have exited, but not in error, so we won't say
171 # anything more at this point.
173 ### Write input to subprocess ### noqa: E266
175 def write(self, str):
176 """Write a STRING to the subprocess."""
179 raise SubprocessError("no child") # ===>
180 if select.select([], self.toChild_fdlist, [], 0)[1]:
181 self.toChild.write(str)
184 # XXX Can write-buffer full be handled better??
185 raise IOError("write to %s blocked" % self) # ===>
187 def writeline(self, line=''):
188 """Write STRING, with added newline termination, to subprocess."""
189 self.write(line + '\n')
191 ### Get output from subprocess ### noqa: E266
193 def peekPendingChar(self):
194 """Return, but (effectively) do not consume a single pending output
195 char, or return null string if none pending."""
197 return self.readbuf.peekPendingChar() # ===>
199 def peekPendingErrChar(self):
200 """Return, but (effectively) do not consume a single pending output
201 char, or return null string if none pending."""
203 return self.errbuf.peekPendingChar() # ===>
205 def waitForPendingChar(self, timeout, pollPause=.1):
206 """Block max TIMEOUT secs until we peek a pending char, returning the
207 char, or '' if none encountered.
209 Pause POLLPAUSE secs (default .1) between polls."""
213 nextChar = self.readbuf.peekPendingChar()
214 if nextChar or (accume > timeout):
216 time.sleep(pollPause)
217 accume = accume + pollPause
219 def read(self, n=None):
220 """Read N chars, or all pending if no N specified."""
222 return self.readPendingChars()
225 got0 = self.readPendingChars(n)
230 def readPendingChars(self, max=None):
231 """Read all currently pending subprocess output as a single string."""
232 return self.readbuf.readPendingChars(max)
234 def readPendingErrChars(self):
235 """Read all currently pending subprocess error output as a single
237 if self.control_stderr:
238 return self.errbuf.readPendingChars()
240 raise SubprocessError("Haven't grabbed subprocess error stream.")
242 def readPendingLine(self):
243 """Read currently pending subprocess output, up to a complete line
244 (newline inclusive)."""
245 return self.readbuf.readPendingLine()
247 def readPendingErrLine(self):
248 """Read currently pending subprocess error output, up to a complete
249 line (newline inclusive)."""
250 if self.control_stderr:
251 return self.errbuf.readPendingLine()
253 raise SubprocessError("Haven't grabbed subprocess error stream.")
256 """Return next complete line of subprocess output, blocking until
258 return self.readbuf.readline()
260 def readlineErr(self):
261 """Return next complete line of subprocess error output, blocking until
263 if self.control_stderr:
264 return self.errbuf.readline()
266 raise SubprocessError("Haven't grabbed subprocess error stream.")
268 ### Subprocess Control ### noqa: E266
271 """True if subprocess is alive and kicking."""
272 return self.status(boolean=1)
274 def status(self, boolean=0):
275 """Return string indicating whether process is alive or dead."""
278 status = 'sans command'
280 status = 'sans process'
281 elif not self.cont():
282 status = "(unresponding) '%s'" % self.cmd
284 status = "'%s'" % self.cmd
291 def stop(self, verbose=1):
292 """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
295 os.kill(self.pid, signal.SIGSTOP)
299 "Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
302 print("Stopped '%s'" % self.cmd)
305 def cont(self, verbose=0):
306 """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
309 os.kill(self.pid, signal.SIGCONT)
312 print(("Continue failed for '%s' - '%s'" %
313 (self.cmd, sys.exc_value)))
316 print("Continued '%s'" % self.cmd)
320 """Send process PID signal SIG (default 9, 'kill'), returning None once
321 it is successfully reaped.
323 SubprocessError is raised if process is not successfully killed."""
326 raise SubprocessError("No process") # ===>
327 elif not self.cont():
328 raise SubprocessError("Can't signal subproc %s" % self) # ===>
330 # Try sending first a TERM and then a KILL signal.
331 sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
334 os.kill(self.pid, sig[1])
338 # Try a couple or three times to reap the process with waitpid:
340 # WNOHANG == 1 on sunos, presumably same elsewhere.
341 if os.waitpid(self.pid, os.WNOHANG):
342 if self.expire_noisily:
343 print(("\n(%s subproc %d '%s' / %s)" %
344 (sig[0], self.pid, self.cmd,
346 for i in self.pipefiles:
351 del self.pipefiles[:]
355 # Only got here if subprocess is not gone:
356 raise SubprocessError(
357 "Failed kill of subproc %d, '%s', with signals %s" %
358 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
361 """Terminate the subprocess"""
366 status = self.status()
367 return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
370 ##############################################################################
371 ##### Non-blocking read operations ##### noqa: E266
372 ##############################################################################
376 """Output buffer for non-blocking reads on selectable files like pipes and
377 sockets. Init with a file descriptor for the file."""
379 def __init__(self, fd, maxChunkSize=1024):
380 """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
386 self.eof = 0 # May be set with stuff still in .buf
388 self.chunkSize = maxChunkSize # Biggest read chunk, default 1024.
393 def peekPendingChar(self):
394 """Return, but don't consume, first character of unconsumed output from
395 file, or empty string if none."""
398 return self.buf[0] # ===>
403 sel = select.select([self.fd], [], [self.fd], 0)
407 self.buf = os.read(self.fd, self.chunkSize) # ===>
408 return self.buf[0] # Assume select don't lie.
412 def readPendingChar(self):
413 """Consume first character of unconsumed output from file, or empty
417 got, self.buf = self.buf[0], self.buf[1:]
423 sel = select.select([self.fd], [], [self.fd], 0)
427 return os.read(self.fd, 1) # ===>
431 def readPendingChars(self, max=None):
432 """Consume uncomsumed output from FILE, or empty string if nothing
437 if (max > 0) and (len(self.buf) > max):
438 got = self.buf[0:max]
439 self.buf = self.buf[max:]
441 got, self.buf = self.buf, ''
447 sel = select.select([self.fd], [], [self.fd], 0)
451 got = got + os.read(self.fd, self.chunkSize)
458 self.buf = self.buf + got[max:]
465 def readPendingLine(self, block=0):
466 """Return pending output from FILE, up to first newline (inclusive).
468 Does not block unless optional arg BLOCK is true.
470 Note that an error will be raised if a new eof is encountered without
474 to = self.buf.find('\n')
476 got, self.buf = self.buf[:to+1], self.buf[to+1:]
478 got, self.buf = self.buf, ''
484 # Herein, 'got' contains the (former) contents of the buffer, and it
485 # doesn't include a newline.
487 period = block and 1.0 or 0 # don't be too busy while waiting
488 while 1: # (we'll only loop if block set)
489 sel = select.select(fdlist, [], fdlist, period)
493 got = got + os.read(self.fd, self.chunkSize)
497 got, self.buf = got[:to+1], got[to+1:]
502 self.buf = '' # this is how an ordinary file acts...
504 # otherwise - no newline, blocking requested, no eof - loop. # ==^
507 """Return next output line from file, blocking until it is received."""
509 return self.readPendingLine(1) # ===>
512 #############################################################################
513 ##### Encapsulated reading and writing ##### noqa: E266
514 #############################################################################
515 # Encapsulate messages so the end can be unambiguously identified, even
516 # when they contain multiple, possibly empty lines.
520 """Encapsulate stream object for record-oriented IO.
522 Particularly useful when dealing with non-line oriented communications
523 over pipes, eg with subprocesses."""
525 # Message is written preceded by a line containing the message length.
527 def __init__(self, f):
530 def write_record(self, s):
531 "Write so self.read knows exactly how much to read."
532 f = self.__dict__['file']
533 f.write("%s\n%s" % (len(s), s))
534 if hasattr(f, 'flush'):
537 def read_record(self):
538 "Read and reconstruct message as prepared by self.write."
539 f = self.__dict__['file']
540 line = f.readline()[:-1]
545 raise IOError(("corrupt %s file structure"
546 % self.__class__.__name__))
552 def __getattr__(self, attr):
553 """Implement characteristic file object attributes."""
554 f = self.__dict__['file']
556 return getattr(f, attr)
558 raise AttributeError(attr)
561 return "<%s of %s at %s>" % (self.__class__.__name__,
562 self.__dict__['file'],
567 """Exercise encapsulated write/read with an arbitrary string.
569 Raise IOError if the string gets distorted through transmission!"""
570 from StringIO import StringIO
576 show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
578 raise IOError("String distorted:\n%s" % show)
581 #############################################################################
582 ##### An example subprocess interfaces ##### noqa: E266
583 #############################################################################
587 """Convenient interface to CCSO 'ph' nameserver subprocess.
589 .query('string...') method takes a query and returns a list of dicts, each
590 of which represents one entry."""
592 # Note that i made this a class that handles a subprocess object, rather
593 # than one that inherits from it. I didn't see any functional
594 # disadvantages, and didn't think that full support of the entire
595 # Subprocess functionality was in any way suitable for interaction with
596 # this specialized interface. ? klm 13-Jan-1995
600 self.proc = Subprocess('ph', 1)
602 raise SubprocessError('failure starting ph: %s' % # ===>
606 """Send a query and return a list of dicts for responses.
608 Raise a ValueError if ph responds with an error."""
612 self.proc.writeline('query ' + q)
616 response = self.getreply() # Should get null on new prompt.
617 errs = self.proc.readPendingErrChars()
619 sys.stderr.write(errs)
625 elif isinstance(response, str):
626 raise ValueError("ph failed match: '%s'" % response) # ===>
627 for line in response:
629 line = line.split(':')
630 it[line.strip([0])] = (''.join(line[1:])).strip()
633 """Consume next response from ph, returning list of lines or string
635 # Key on first char: (First line may lack newline.)
636 # - dash discard line
637 # - 'ph> ' conclusion of response
638 # - number error message
639 # - whitespace beginning of next response
641 nextChar = self.proc.waitForPendingChar(60)
643 raise SubprocessError('ph subprocess not responding') # ===>
644 elif nextChar == '-':
645 # dashed line - discard it, and continue reading:
647 return self.getreply() # ===>
648 elif nextChar == 'p':
649 # 'ph> ' prompt - don't think we should hit this, but what the hay:
651 elif nextChar in '0123456789':
652 # Error notice - we're currently assuming single line errors:
653 return self.proc.readline()[:-1] # ===>
654 elif nextChar in ' \t':
655 # Get content, up to next dashed line:
657 while nextChar != '-' and nextChar != '':
658 got.append(self.proc.readline()[:-1])
659 nextChar = self.proc.peekPendingChar()
663 return "<Ph instance, %s at %s>\n" % (self.proc.status(),
667 """Clear-out initial preface or residual subproc input and output."""
669 maxIter = 10 # 5 seconds to clear
673 while iterations < maxIter:
674 got = got + self.proc.readPendingChars()
675 # Strip out all but the last incomplete line:
676 got = got.split('\n')[-1]
680 raise SubprocessError('ph not responding within %s secs' %
684 ##############################################################################
685 ##### Test ##### noqa: E266
686 ##############################################################################
690 print("\tOpening bogus subprocess, should fail:")
693 print("\tOops! Null-named subprocess startup *succeeded*?!?")
694 except SubprocessError:
695 print("\t...yep, it failed.")
696 print("\tOpening cat subprocess:")
697 p = Subprocess('cat', 1) # set to expire noisily...
699 print('\tWrite, then read, two newline-teriminated lines, using readline:')
700 p.write('first full line written\n')
702 print(repr(p.readline()))
703 print(repr(p.readline()))
704 print('\tThree lines, last sans newline, read using combination:')
707 p.write('third, (no cr)')
708 print('\tFirst line via readline:')
709 print(repr(p.readline()))
710 print('\tRest via readPendingChars:')
711 print(p.readPendingChars())
712 print("\tStopping then continuing subprocess (verbose):")
713 if not p.stop(1): # verbose stop
714 print('\t** Stop seems to have failed!')
716 print('\tWriting line while subprocess is paused...')
717 p.write('written while subprocess paused\n')
718 print('\tNonblocking read of paused subprocess (should be empty):')
719 print(p.readPendingChars())
720 print('\tContinuing subprocess (verbose):')
721 if not p.cont(1): # verbose continue
723 '\t** Continue seems to have failed! Probly lost subproc...')
726 print('\tReading accumulated line, blocking read:')
728 print("\tDeleting subproc, which was set to die noisily:")
734 if __name__ == '__main__':