1 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
3 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
6 Subprocess class features:
8 - provides non-blocking stdin and stderr reads
10 - provides subprocess stop and continue, kill-on-deletion
12 - provides detection of subprocess startup failure
14 - Subprocess objects have nice, informative string rep (as every good object
17 __version__ = "Revision: 1.15 "
19 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
20 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
22 # Prior art: Initially based python code examples demonstrating usage of pipes
23 # and subprocesses, primarily one by jose pereira.
25 # Implementation notes:
26 # - I'm not using the fcntl module to implement non-blocking file descriptors,
27 # because i don't know what all in it is portable and what is not. I'm not
28 # about to provide for different platform contingencies - at that extent, the
29 # effort would be better spent hacking 'expect' into python.
30 # - Todo? - Incorporate an error-output handler approach, where error output is
31 # checked on regular IO, when a handler is defined, and passed to the
32 # handler (eg for printing) immediately as it shows...
33 # - Detection of failed subprocess startup is a gross kludge, at present.
35 # - new additions (1.3, 1.4):
36 # - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
37 # reads based solely on os.read with select, while capitalizing big-time on
38 # multi-char read chunking.
39 # - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
41 # ken.manheimer@nist.gov
43 # This is a modified version by Oleg Broytman <phd@phdru.name>.
44 # The original version is still preserved at
45 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
47 import sys, os, string, time, types
52 SubprocessError = 'SubprocessError'
53 # You may need to increase execvp_grace_seconds, if you have a large or slow
55 execvp_grace_seconds = 0.5
58 """Run and communicate asynchronously with a subprocess.
60 Provides non-blocking reads in the form of .readPendingChars and
63 .readline will block until it gets a complete line.
65 .peekPendingChar does a non-blocking, non-consuming read for pending
66 output, and can be used before .readline to check non-destructively for
67 pending output. .waitForPendingChar(timeout, pollPause=.1) blocks until
68 a new character is pending, or timeout secs pass, with granularity of
71 There are corresponding read and peekPendingErrXXX routines, to read from
72 the subprocess stderr stream."""
76 expire_noisily = 1 # Announce subproc destruction?
78 readbuf = 0 # fork will assign to be a readbuf obj
79 errbuf = 0 # fork will assign to be a readbuf obj
81 def __init__(self, cmd, control_stderr=0, expire_noisily=0,
82 in_fd=0, out_fd=1, err_fd=2):
83 """Launch a subprocess, given command string COMMAND."""
86 self.expire_noisily = expire_noisily
87 self.control_stderr = control_stderr
88 self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
91 def fork(self, cmd=None):
92 """Fork a subprocess with designated COMMAND (default, self.cmd)."""
93 if cmd: self.cmd = cmd
95 cmd = string.split(self.cmd)
96 pRc, cWp = os.pipe() # parent-read-child, child-write-parent
97 cRp, pWc = os.pipe() # child-read-parent, parent-write-child
98 pRe, cWe = os.pipe() # parent-read-error, child-write-error
99 self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
103 if self.pid == 0: #### CHILD ####
104 parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
105 # Reopen stdin, out, err, on pipe ends:
106 os.dup2(cRp, self.in_fd) # cRp = sys.stdin
107 os.dup2(cWp, self.out_fd) # cWp = sys.stdout
108 if self.control_stderr:
109 os.dup2(cWe, self.err_fd) # cWe = sys.stderr
110 # Ensure (within reason) stray file descriptors are closed:
111 excludes = [self.in_fd, self.out_fd, self.err_fd]
112 for i in range(4,100):
113 if i not in excludes:
115 except os.error: pass
118 os.execvp(cmd[0], cmd)
119 os._exit(1) # Shouldn't get here
121 except os.error as e:
122 if self.control_stderr:
123 os.dup2(parentErr, 2) # Reconnect to parent's stdout
124 sys.stderr.write("**execvp failed, '%s'**\n" %
127 os._exit(1) # Shouldn't get here.
130 # Connect to the child's file descriptors, using our customized
132 self.toChild = os.fdopen(pWc, 'w')
133 self.toChild_fdlist = [pWc]
134 self.readbuf = ReadBuf(pRc)
135 self.errbuf = ReadBuf(pRe)
136 time.sleep(execvp_grace_seconds)
138 pid, err = os.waitpid(self.pid, os.WNOHANG)
139 except os.error as error:
142 raise SubprocessError("Subprocess '%s' failed." % self.cmd)
143 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
145 # child exited already
148 rc = (err & 0xff00) >> 8
150 raise SubprocessError(
151 "child killed by signal %d with a return code of %d"
154 raise SubprocessError(
155 "child exited with return code %d" % rc)
156 # Child may have exited, but not in error, so we won't say
157 # anything more at this point.
159 ### Write input to subprocess ###
161 def write(self, str):
162 """Write a STRING to the subprocess."""
165 raise SubprocessError("no child") # ===>
166 if select.select([],self.toChild_fdlist,[],0)[1]:
167 self.toChild.write(str)
170 # XXX Can write-buffer full be handled better??
171 raise IOError("write to %s blocked" % self) # ===>
173 def writeline(self, line=''):
174 """Write STRING, with added newline termination, to subprocess."""
175 self.write(line + '\n')
177 ### Get output from subprocess ###
179 def peekPendingChar(self):
180 """Return, but (effectively) do not consume a single pending output
181 char, or return null string if none pending."""
183 return self.readbuf.peekPendingChar() # ===>
184 def peekPendingErrChar(self):
185 """Return, but (effectively) do not consume a single pending output
186 char, or return null string if none pending."""
188 return self.errbuf.peekPendingChar() # ===>
190 def waitForPendingChar(self, timeout, pollPause=.1):
191 """Block max TIMEOUT secs until we peek a pending char, returning the
192 char, or '' if none encountered.
194 Pause POLLPAUSE secs (default .1) between polls."""
198 nextChar = self.readbuf.peekPendingChar()
199 if nextChar or (accume > timeout): return nextChar
200 time.sleep(pollPause)
201 accume = accume + pollPause
203 def read(self, n=None):
204 """Read N chars, or all pending if no N specified."""
206 return self.readPendingChars()
209 got0 = self.readPendingChars(n)
213 def readPendingChars(self, max=None):
214 """Read all currently pending subprocess output as a single string."""
215 return self.readbuf.readPendingChars(max)
216 def readPendingErrChars(self):
217 """Read all currently pending subprocess error output as a single
219 if self.control_stderr:
220 return self.errbuf.readPendingChars()
222 raise SubprocessError("Haven't grabbed subprocess error stream.")
224 def readPendingLine(self):
225 """Read currently pending subprocess output, up to a complete line
226 (newline inclusive)."""
227 return self.readbuf.readPendingLine()
228 def readPendingErrLine(self):
229 """Read currently pending subprocess error output, up to a complete
230 line (newline inclusive)."""
231 if self.control_stderr:
232 return self.errbuf.readPendingLine()
234 raise SubprocessError("Haven't grabbed subprocess error stream.")
237 """Return next complete line of subprocess output, blocking until
239 return self.readbuf.readline()
240 def readlineErr(self):
241 """Return next complete line of subprocess error output, blocking until
243 if self.control_stderr:
244 return self.errbuf.readline()
246 raise SubprocessError("Haven't grabbed subprocess error stream.")
248 ### Subprocess Control ###
251 """True if subprocess is alive and kicking."""
252 return self.status(boolean=1)
253 def status(self, boolean=0):
254 """Return string indicating whether process is alive or dead."""
257 status = 'sans command'
259 status = 'sans process'
260 elif not self.cont():
261 status = "(unresponding) '%s'" % self.cmd
263 status = "'%s'" % self.cmd
270 def stop(self, verbose=1):
271 """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
274 os.kill(self.pid, signal.SIGSTOP)
277 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
279 if verbose: print("Stopped '%s'" % self.cmd)
282 def cont(self, verbose=0):
283 """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
286 os.kill(self.pid, signal.SIGCONT)
289 print(("Continue failed for '%s' - '%s'" %
290 (self.cmd, sys.exc_value)))
292 if verbose: print("Continued '%s'" % self.cmd)
296 """Send process PID signal SIG (default 9, 'kill'), returning None once
297 it is successfully reaped.
299 SubprocessError is raised if process is not successfully killed."""
302 raise SubprocessError("No process") # ===>
303 elif not self.cont():
304 raise SubprocessError("Can't signal subproc %s" % self) # ===>
306 # Try sending first a TERM and then a KILL signal.
308 sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
311 os.kill(self.pid, sig[1])
315 # Try a couple or three times to reap the process with waitpid:
317 # WNOHANG == 1 on sunos, presumably same elsewhere.
318 if os.waitpid(self.pid, os.WNOHANG):
319 if self.expire_noisily:
320 print(("\n(%s subproc %d '%s' / %s)" %
321 (sig[0], self.pid, self.cmd,
323 for i in self.pipefiles:
328 # Only got here if subprocess is not gone:
329 raise SubprocessError(
330 "Failed kill of subproc %d, '%s', with signals %s" %
331 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
334 """Terminate the subprocess"""
339 status = self.status()
340 return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
342 #############################################################################
343 ##### Non-blocking read operations #####
344 #############################################################################
347 """Output buffer for non-blocking reads on selectable files like pipes and
348 sockets. Init with a file descriptor for the file."""
350 def __init__(self, fd, maxChunkSize=1024):
351 """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
357 self.eof = 0 # May be set with stuff still in .buf
359 self.chunkSize = maxChunkSize # Biggest read chunk, default 1024.
364 def peekPendingChar(self):
365 """Return, but don't consume, first character of unconsumed output from
366 file, or empty string if none."""
369 return self.buf[0] # ===>
374 sel = select.select([self.fd], [], [self.fd], 0)
378 self.buf = os.read(self.fd, self.chunkSize) # ===>
379 return self.buf[0] # Assume select don't lie.
380 else: return '' # ===>
383 def readPendingChar(self):
384 """Consume first character of unconsumed output from file, or empty
388 got, self.buf = self.buf[0], self.buf[1:]
394 sel = select.select([self.fd], [], [self.fd], 0)
398 return os.read(self.fd, 1) # ===>
399 else: return '' # ===>
401 def readPendingChars(self, max=None):
402 """Consume uncomsumed output from FILE, or empty string if nothing
407 if (max > 0) and (len(self.buf) > max):
408 got = self.buf[0:max]
409 self.buf = self.buf[max:]
411 got, self.buf = self.buf, ''
417 sel = select.select([self.fd], [], [self.fd], 0)
421 got = got + os.read(self.fd, self.chunkSize)
428 self.buf = self.buf + got[max:]
434 def readPendingLine(self, block=0):
435 """Return pending output from FILE, up to first newline (inclusive).
437 Does not block unless optional arg BLOCK is true.
439 Note that an error will be raised if a new eof is encountered without
443 to = string.find(self.buf, '\n')
445 got, self.buf = self.buf[:to+1], self.buf[to+1:]
447 got, self.buf = self.buf, ''
453 # Herein, 'got' contains the (former) contents of the buffer, and it
454 # doesn't include a newline.
456 period = block and 1.0 or 0 # don't be too busy while waiting
457 while 1: # (we'll only loop if block set)
458 sel = select.select(fdlist, [], fdlist, period)
462 got = got + os.read(self.fd, self.chunkSize)
464 to = string.find(got, '\n')
466 got, self.buf = got[:to+1], got[to+1:]
471 self.buf = '' # this is how an ordinary file acts...
473 # otherwise - no newline, blocking requested, no eof - loop. # ==^
476 """Return next output line from file, blocking until it is received."""
478 return self.readPendingLine(1) # ===>
481 #############################################################################
482 ##### Encapsulated reading and writing #####
483 #############################################################################
484 # Encapsulate messages so the end can be unambiguously identified, even
485 # when they contain multiple, possibly empty lines.
488 """Encapsulate stream object for record-oriented IO.
490 Particularly useful when dealing with non-line oriented communications
491 over pipes, eg with subprocesses."""
493 # Message is written preceded by a line containing the message length.
495 def __init__(self, f):
498 def write_record(self, s):
499 "Write so self.read knows exactly how much to read."
500 f = self.__dict__['file']
501 f.write("%s\n%s" % (len(s), s))
502 if hasattr(f, 'flush'):
505 def read_record(self):
506 "Read and reconstruct message as prepared by self.write."
507 f = self.__dict__['file']
508 line = f.readline()[:-1]
511 l = string.atoi(line)
513 raise IOError(("corrupt %s file structure"
514 % self.__class__.__name__))
520 def __getattr__(self, attr):
521 """Implement characteristic file object attributes."""
522 f = self.__dict__['file']
524 return getattr(f, attr)
526 raise AttributeError(attr)
529 return "<%s of %s at %s>" % (self.__class__.__name__,
530 self.__dict__['file'],
534 """Exercise encapsulated write/read with an arbitrary string.
536 Raise IOError if the string gets distorted through transmission!"""
537 from StringIO import StringIO
543 show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
545 raise IOError("String distorted:\n%s" % show)
547 #############################################################################
548 ##### An example subprocess interfaces #####
549 #############################################################################
552 """Convenient interface to CCSO 'ph' nameserver subprocess.
554 .query('string...') method takes a query and returns a list of dicts, each
555 of which represents one entry."""
557 # Note that i made this a class that handles a subprocess object, rather
558 # than one that inherits from it. I didn't see any functional
559 # disadvantages, and didn't think that full support of the entire
560 # Subprocess functionality was in any way suitable for interaction with
561 # this specialized interface. ? klm 13-Jan-1995
565 self.proc = Subprocess('ph', 1)
567 raise SubprocessError('failure starting ph: %s' % # ===>
571 """Send a query and return a list of dicts for responses.
573 Raise a ValueError if ph responds with an error."""
577 self.proc.writeline('query ' + q)
580 response = self.getreply() # Should get null on new prompt.
581 errs = self.proc.readPendingErrChars()
583 sys.stderr.write(errs)
589 elif type(response) == types.StringType:
590 raise ValueError("ph failed match: '%s'" % response) # ===>
591 for line in response:
593 line = string.splitfields(line, ':')
594 it[string.strip(line[0])] = (
595 string.strip(string.join(line[1:])))
598 """Consume next response from ph, returning list of lines or string
600 # Key on first char: (First line may lack newline.)
601 # - dash discard line
602 # - 'ph> ' conclusion of response
603 # - number error message
604 # - whitespace beginning of next response
606 nextChar = self.proc.waitForPendingChar(60)
608 raise SubprocessError('ph subprocess not responding') # ===>
609 elif nextChar == '-':
610 # dashed line - discard it, and continue reading:
612 return self.getreply() # ===>
613 elif nextChar == 'p':
614 # 'ph> ' prompt - don't think we should hit this, but what the hay:
616 elif nextChar in '0123456789':
617 # Error notice - we're currently assuming single line errors:
618 return self.proc.readline()[:-1] # ===>
619 elif nextChar in ' \t':
620 # Get content, up to next dashed line:
622 while nextChar != '-' and nextChar != '':
623 got.append(self.proc.readline()[:-1])
624 nextChar = self.proc.peekPendingChar()
627 return "<Ph instance, %s at %s>\n" % (self.proc.status(),
630 """Clear-out initial preface or residual subproc input and output."""
631 pause = .5; maxIter = 10 # 5 seconds to clear
635 while iterations < maxIter:
636 got = got + self.proc.readPendingChars()
637 # Strip out all but the last incomplete line:
638 got = string.splitfields(got, '\n')[-1]
639 if got == 'ph> ': return # Ok. ===>
641 raise SubprocessError('ph not responding within %s secs' %
644 #############################################################################
646 #############################################################################
649 print("\tOpening subprocess:")
650 p = Subprocess('cat', 1) # set to expire noisily...
652 print("\tOpening bogus subprocess, should fail:")
654 b = Subprocess('/', 1)
655 print("\tOops! Null-named subprocess startup *succeeded*?!?")
656 except SubprocessError:
657 print("\t...yep, it failed.")
658 print('\tWrite, then read, two newline-teriminated lines, using readline:')
659 p.write('first full line written\n'); p.write('second.\n')
660 print(repr(p.readline()))
661 print(repr(p.readline()))
662 print('\tThree lines, last sans newline, read using combination:')
663 p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
664 print('\tFirst line via readline:')
665 print(repr(p.readline()))
666 print('\tRest via readPendingChars:')
667 print(p.readPendingChars())
668 print("\tStopping then continuing subprocess (verbose):")
669 if not p.stop(1): # verbose stop
670 print('\t** Stop seems to have failed!')
672 print('\tWriting line while subprocess is paused...')
673 p.write('written while subprocess paused\n')
674 print('\tNonblocking read of paused subprocess (should be empty):')
675 print(p.readPendingChars())
676 print('\tContinuing subprocess (verbose):')
677 if not p.cont(1): # verbose continue
678 print('\t** Continue seems to have failed! Probly lost subproc...')
681 print('\tReading accumulated line, blocking read:')
683 print("\tDeleting subproc, which was set to die noisily:")