]> git.phdru.name Git - bookmarks_db.git/blob - subproc.py
Add a few TODO items
[bookmarks_db.git] / subproc.py
1 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
2
3 Requires that platform supports, eg, posix-style os.pipe and os.fork.
4
5 Subprocess class features:
6
7  - provides non-blocking stdin and stderr reads
8
9  - provides subprocess stop and continue, kill-on-deletion
10
11  - provides detection of subprocess startup failure
12
13  - Subprocess objects have nice, informative string rep (as every good object
14    ought).
15
16  - RecordFile class provides record-oriented IO for file-like stream objects.
17 """
18
19 __version__ = "Revision: 1.7 "
20
21 # Id: subproc.py,v 1.7 1998
22 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
23
24 # Prior art: Initially based python code examples demonstrating usage of pipes
25 #            and subprocesses, primarily one by jose pereira.
26
27 # Implementation notes:
28 # - I'm not using the fcntl module to implement non-blocking file descriptors,
29 #   because i don't know what all in it is portable and what is not.  I'm not
30 #   about to provide for different platform contingencies - at that extent, the
31 #   effort would be better spent hacking 'expect' into python.
32 # - Todo? - Incorporate an error-output handler approach, where error output is
33 #           checked on regular IO, when a handler is defined, and passed to the
34 #           handler (eg for printing) immediately as it shows...
35 # - Detection of failed subprocess startup is a gross kludge, at present.
36
37 # - new additions (1.3, 1.4):
38 #  - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
39 #    reads based solely on os.read with select, while capitalizing big-time on
40 #    multi-char read chunking.
41 #  - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
42 #
43 # ken.manheimer@nist.gov
44
45
46 import sys, os, string, time, types
47 import select
48 import signal
49
50
51 SubprocessError = 'SubprocessError'
52 # You may need to increase execvp_grace_seconds, if you have a large or slow
53 # path to search:
54 execvp_grace_seconds = 0.5
55
56 class Subprocess:
57     """Run and communicate asynchronously with a subprocess.
58
59     Provides non-blocking reads in the form of .readPendingChars and
60     .readPendingLine.
61
62     .readline will block until it gets a complete line.
63
64     .peekPendingChar does a non-blocking, non-consuming read for pending
65     output, and can be used before .readline to check non-destructively for
66     pending output.  .waitForPendingChar(timeout, pollPause=.1) blocks until
67     a new character is pending, or timeout secs pass, with granularity of
68     pollPause seconds.
69
70     There are corresponding read and peekPendingErrXXX routines, to read from
71     the subprocess stderr stream."""
72
73     pid = 0
74     cmd = ''
75     expire_noisily = 1                  # Announce subproc destruction?
76     pipefiles = []
77     readbuf = 0                         # fork will assign to be a readbuf obj
78     errbuf = 0                          # fork will assign to be a readbuf obj
79
80     def __init__(self, cmd, control_stderr=0, expire_noisily=0,
81                  in_fd=0, out_fd=1, err_fd=2):
82         """Launch a subprocess, given command string COMMAND."""
83         self.cmd = cmd
84         self.pid = 0
85         self.expire_noisily = expire_noisily
86         self.control_stderr = control_stderr
87         self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
88         self.fork()
89
90     def fork(self, cmd=None):
91         """Fork a subprocess with designated COMMAND (default, self.cmd)."""
92         if cmd: self.cmd = cmd
93         else: cmd = self.cmd
94         pRc, cWp = os.pipe()            # parent-read-child, child-write-parent
95         cRp, pWc = os.pipe()            # child-read-parent, parent-write-child
96         pRe, cWe = os.pipe()            # parent-read-error, child-write-error
97         self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
98
99         self.pid = os.fork()
100
101         if self.pid == 0:       #### CHILD ####
102             parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
103             # Reopen stdin, out, err, on pipe ends:
104             os.dup2(cRp, self.in_fd)            # cRp = sys.stdin
105             os.dup2(cWp, self.out_fd)           # cWp = sys.stdout
106             if self.control_stderr:
107                 os.dup2(cWe, self.err_fd)       # cWe = sys.stderr
108             # Ensure (within reason) stray file descriptors are closed:
109             excludes = [self.in_fd, self.out_fd, self.err_fd]
110             for i in range(4,100):
111                 if i not in excludes:
112                     try: os.close(i)
113                     except os.error: pass
114
115             self.run_cmd(cmd)
116             os._exit(1)                 # Shouldn't get here.
117
118         else:           ### PARENT ###
119             # Connect to the child's file descriptors, using our customized
120             # fdopen:
121             self.toChild = os.fdopen(pWc, 'w')
122             self.toChild_fdlist = [pWc]
123             self.readbuf = ReadBuf(pRc)
124             self.errbuf = ReadBuf(pRe)
125             time.sleep(execvp_grace_seconds)
126             try:
127                 pid, err = os.waitpid(self.pid, os.WNOHANG)
128             except os.error, (errno, msg):
129                 if errno == 10:
130                     raise SubprocessError, \
131                           "Subprocess '%s' failed." % self.cmd
132                 raise SubprocessError, \
133                       "Subprocess failed[%d]: %s" % (errno, msg)
134             if pid == self.pid:
135                 # child exited already
136                 self.pid == None
137                 sig = err & 0xff
138                 rc = (err & 0xff00) >> 8
139                 if sig:
140                     raise SubprocessError, (
141                     "child killed by signal %d with a return code of %d"
142                     % (sig, rc))
143                 if rc:
144                     raise SubprocessError, \
145                           "child exited with return code %d" % rc
146                 # Child may have exited, but not in error, so we won't say
147                 # anything more at this point.
148
149     def run_cmd(self, cmd):
150         cmd = string.split(self.cmd)
151
152         try:
153             os.execvp(cmd[0], cmd)
154             os._exit(1)                     # Shouldn't get here
155
156         except os.error, e:
157             if self.control_stderr:
158                 os.dup2(parentErr, 2)       # Reconnect to parent's stdout
159             sys.stderr.write("**execvp failed, '%s'**\n" %
160                              str(e))
161             os._exit(1)
162
163
164     ### Write input to subprocess ###
165
166     def write(self, str):
167         """Write a STRING to the subprocess."""
168
169         if not self.pid:
170             raise SubprocessError, "no child"                           # ===>
171         if select.select([],self.toChild_fdlist,[],0)[1]:
172             self.toChild.write(str)
173             self.toChild.flush()
174         else:
175             # XXX Can write-buffer full be handled better??
176             raise IOError, "write to %s blocked" % self                 # ===>
177
178     def writeline(self, line=''):
179         """Write STRING, with added newline termination, to subprocess."""
180         self.write(line + '\n')
181
182     ### Get output from subprocess ###
183
184     def peekPendingChar(self):
185         """Return, but (effectively) do not consume a single pending output
186         char, or return null string if none pending."""
187
188         return self.readbuf.peekPendingChar()                           # ===>
189     def peekPendingErrChar(self):
190         """Return, but (effectively) do not consume a single pending output
191         char, or return null string if none pending."""
192
193         return self.errbuf.peekPendingChar()                            # ===>
194
195     def waitForPendingChar(self, timeout, pollPause=.1):
196         """Block max TIMEOUT secs until we peek a pending char, returning the
197         char, or '' if none encountered.
198
199         Pause POLLPAUSE secs (default .1) between polls."""
200
201         accume = 0
202         while 1:
203             nextChar = self.readbuf.peekPendingChar()
204             if nextChar or (accume > timeout): return nextChar
205             time.sleep(pollPause)
206             accume = accume + pollPause
207
208     def read(self, n=None):
209         """Read N chars, or all pending if no N specified."""
210         if n == None:
211             return self.readPendingChars()
212         got = ''
213         while n:
214             got0 = self.readPendingChars(n)
215             got = got + got0
216             n = n - len(got0)
217         return got      
218     def readPendingChars(self, max=None):
219         """Read all currently pending subprocess output as a single string."""
220         return self.readbuf.readPendingChars(max)
221     def readPendingErrChars(self):
222         """Read all currently pending subprocess error output as a single
223         string."""
224         if self.control_stderr:
225             return self.errbuf.readPendingChars()
226         else:
227             raise SubprocessError, "Haven't grabbed subprocess error stream."
228
229     def readPendingLine(self):
230         """Read currently pending subprocess output, up to a complete line
231         (newline inclusive)."""
232         return self.readbuf.readPendingLine()
233     def readPendingErrLine(self):
234         """Read currently pending subprocess error output, up to a complete
235         line (newline inclusive)."""
236         if self.control_stderr:
237             return self.errbuf.readPendingLine()
238         else:
239             raise SubprocessError, "Haven't grabbed subprocess error stream."
240
241     def readline(self):
242         """Return next complete line of subprocess output, blocking until
243         then."""
244         return self.readbuf.readline()
245     def readlineErr(self):
246         """Return next complete line of subprocess error output, blocking until
247         then."""
248         if self.control_stderr:
249             return self.errbuf.readline()
250         else:
251             raise SubprocessError, "Haven't grabbed subprocess error stream."
252
253     ### Subprocess Control ###
254
255     def active(self):
256         """True if subprocess is alive and kicking."""
257         return self.status(boolean=1)
258     def status(self, boolean=0):
259         """Return string indicating whether process is alive or dead."""
260         active = 0
261         if not self.cmd:
262             status = 'sans command'
263         elif not self.pid:
264             status = 'sans process'
265         elif not self.cont():
266             status = "(unresponding) '%s'" % self.cmd
267         else:
268             status = "'%s'" % self.cmd
269             active = 1
270         if boolean:
271             return active
272         else:
273             return status
274
275     def stop(self, verbose=1):
276         """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
277         otherwise."""
278         try:
279             os.kill(self.pid, signal.SIGSTOP)
280         except os.error:
281             if verbose:
282                 print "Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value)
283             return 0
284         if verbose: print "Stopped '%s'" % self.cmd
285         return 'stopped'
286
287     def cont(self, verbose=0):
288         """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
289         otherwise."""
290         try:
291             os.kill(self.pid, signal.SIGCONT)
292         except os.error:
293             if verbose:
294                 print ("Continue failed for '%s' - '%s'" %
295                        (self.cmd, sys.exc_value))
296             return 0
297         if verbose: print "Continued '%s'" % self.cmd
298         return 'continued'
299
300     def die(self):
301         """Send process PID signal SIG (default 9, 'kill'), returning None once
302          it is successfully reaped.
303
304         SubprocessError is raised if process is not successfully killed."""
305
306         if not self.pid:
307             raise SubprocessError, "No process"                         # ===>
308         elif not self.cont():
309             raise SubprocessError, "Can't signal subproc %s" % self     # ===>
310
311         # Try sending first a TERM and then a KILL signal.
312         keep_trying = 1
313         sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
314         for sig in sigs:
315             try:
316                 os.kill(self.pid, sig[1])
317             except posix.error:
318                 # keep trying
319                 pass
320             # Try a couple or three times to reap the process with waitpid:
321             for i in range(5):
322                 # WNOHANG == 1 on sunos, presumably same elsewhere.
323                 if os.waitpid(self.pid, os.WNOHANG):
324                     if self.expire_noisily:
325                         print ("\n(%s subproc %d '%s' / %s)" %
326                                (sig[0], self.pid, self.cmd,
327                                 hex(id(self))[2:]))
328                     for i in self.pipefiles:
329                         os.close(i)
330                     self.pid = 0
331                     return None                                         # ===>
332                 time.sleep(.1)
333         # Only got here if subprocess is not gone:
334         raise (SubprocessError,
335                ("Failed kill of subproc %d, '%s', with signals %s" %
336                 (self.pid, self.cmd, map(lambda(x): x[0], sigs))))
337
338     def __del__(self):
339         """Terminate the subprocess"""
340         if self.pid:
341             self.die()
342
343     def __repr__(self):
344         status = self.status()
345         return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
346
347 # The name of the class is a pun; it is short for "Process", but it also appeals
348 # to the word "Procedure"
349 class Subproc(Subprocess):
350     def run_cmd(self, cmd):
351         apply(cmd[0], cmd[1:])
352         os._exit(1)
353
354 #############################################################################
355 #####                 Non-blocking read operations                      #####
356 #############################################################################
357
358 class ReadBuf:
359     """Output buffer for non-blocking reads on selectable files like pipes and
360     sockets.  Init with a file descriptor for the file."""
361
362     def __init__(self, fd, maxChunkSize=1024):
363         """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
364         (default 1024)."""
365
366         if fd < 0:
367             raise ValueError
368         self.fd = fd
369         self.eof = 0                    # May be set with stuff still in .buf
370         self.buf = ''
371         self.chunkSize = maxChunkSize   # Biggest read chunk, default 1024.
372
373     def fileno(self):
374         return self.fd
375
376     def peekPendingChar(self):
377         """Return, but don't consume, first character of unconsumed output from
378         file, or empty string if none."""
379
380         if self.buf:
381             return self.buf[0]                                          # ===>
382
383         if self.eof:
384             return ''                                                   # ===>
385
386         sel = select.select([self.fd], [], [self.fd], 0)
387         if sel[2]:
388             self.eof = 1
389         if sel[0]:
390             self.buf = os.read(self.fd, self.chunkSize)                 # ===>
391             return self.buf[0]          # Assume select don't lie.
392         else: return ''                                                 # ===>
393
394
395     def readPendingChar(self):
396         """Consume first character of unconsumed output from file, or empty
397         string if none."""
398
399         if self.buf:
400             got, self.buf = self.buf[0], self.buf[1:]
401             return got                                                  # ===>
402
403         if self.eof:
404             return ''                                                   # ===>
405
406         sel = select.select([self.fd], [], [self.fd], 0)
407         if sel[2]:
408             self.eof = 1
409         if sel[0]:
410             return os.read(self.fd, 1)                                  # ===>
411         else: return ''                                                 # ===>
412
413     def readPendingChars(self, max=None):
414         """Consume uncomsumed output from FILE, or empty string if nothing
415         pending."""
416
417         got = ""
418         if self.buf:
419             got, self.buf = self.buf, ''
420             return got                                                  # ===>
421
422         if self.eof:
423             return ''
424
425         sel = select.select([self.fd], [], [self.fd], 0)
426         if sel[2]:
427             self.eof = 1
428         if sel[0]:
429             got = got + os.read(self.fd, self.chunkSize)
430             if max == 0:
431                 self.buf = got
432                 return ''
433             elif max == None:
434                 return got
435             elif len(got) > max:
436                 self.buf = self.buf + got[max:]
437                 return got[:max]
438             else:
439                 return got
440         else: return ''
441
442     def readPendingLine(self, block=0):
443         """Return pending output from FILE, up to first newline (inclusive).
444
445         Does not block unless optional arg BLOCK is true.
446
447         Note that an error will be raised if a new eof is encountered without
448         any newline."""
449
450         if self.buf:
451             to = string.find(self.buf, '\n')
452             if to != -1:
453                 got, self.buf = self.buf[:to+1], self.buf[to+1:]
454                 return got                                              # ===>
455             got, self.buf = self.buf, ''
456         else:
457             if self.eof:
458                 return ''                                               # ===>
459             got = ''
460
461         # Herein, 'got' contains the (former) contents of the buffer, and it
462         # doesn't include a newline.
463         fdlist = [self.fd]
464         period = block and 1.0 or 0     # don't be too busy while waiting
465         while 1:                        # (we'll only loop if block set)
466             sel = select.select(fdlist, [], fdlist, period)
467             if sel[2]:
468                 self.eof = 1
469             if sel[0]:
470                 got = got + os.read(self.fd, self.chunkSize)
471
472             to = string.find(got, '\n')
473             if to != -1:
474                 got, self.buf = got[:to+1], got[to+1:]
475                 return got                                              # ===>
476             if not block:
477                 return got                                              # ===>
478             if self.eof:
479                 self.buf = ''           # this is how an ordinary file acts...
480                 return got
481             # otherwise - no newline, blocking requested, no eof - loop. # ==^
482
483     def readline(self):
484         """Return next output line from file, blocking until it is received."""
485
486         return self.readPendingLine(1)                                  # ===>
487
488
489 #############################################################################
490 #####                 Encapsulated reading and writing                  #####
491 #############################################################################
492 # Encapsulate messages so the end can be unambiguously identified, even
493 # when they contain multiple, possibly empty lines.
494
495 class RecordFile:
496     """Encapsulate stream object for record-oriented IO.
497
498     Particularly useful when dealing with non-line oriented communications
499     over pipes, eg with subprocesses."""
500
501     # Message is written preceded by a line containing the message length.
502
503     def __init__(self, f):
504         self.file = f
505
506     def write_record(self, s):
507         "Write so self.read knows exactly how much to read."
508         f = self.__dict__['file']
509         f.write("%s\n%s" % (len(s), s))
510         if hasattr(f, 'flush'):
511             f.flush()
512
513     def read_record(self):
514         "Read and reconstruct message as prepared by self.write."
515         f = self.__dict__['file']
516         line = f.readline()[:-1]
517         if line:
518             try:
519                 l = string.atoi(line)
520             except ValueError:
521                 raise IOError, ("corrupt %s file structure"
522                                 % self.__class__.__name__)
523             return f.read(l)
524         else:
525             # EOF.
526             return ''
527
528     def __getattr__(self, attr):
529         """Implement characteristic file object attributes."""
530         f = self.__dict__['file']
531         if hasattr(f, attr):
532             return getattr(f, attr)
533         else:
534             raise AttributeError, attr
535
536     def __repr__(self):
537         return "<%s of %s at %s>" % (self.__class__.__name__,
538                                      self.__dict__['file'],
539                                      hex(id(self))[2:])
540
541 def record_trial(s):
542     """Exercise encapsulated write/read with an arbitrary string.
543
544     Raise IOError if the string gets distorted through transmission!"""
545     from StringIO import StringIO
546     sf = StringIO()
547     c = RecordFile(sf)
548     c.write(s)
549     c.seek(0)
550     r = c.read()
551     show = " start:\t %s\n end:\t %s\n" % (`s`, `r`)
552     if r != s:
553         raise IOError, "String distorted:\n%s" % show
554
555 #############################################################################
556 #####                   An example subprocess interfaces                #####
557 #############################################################################
558
559 class Ph:
560     """Convenient interface to CCSO 'ph' nameserver subprocess.
561
562     .query('string...') method takes a query and returns a list of dicts, each
563     of which represents one entry."""
564
565     # Note that i made this a class that handles a subprocess object, rather
566     # than one that inherits from it.  I didn't see any functional
567     # disadvantages, and didn't think that full support of the entire
568     # Subprocess functionality was in any way suitable for interaction with
569     # this specialized interface.  ?  klm 13-Jan-1995
570
571     def __init__(self):
572         try:
573             self.proc = Subprocess('ph', 1)
574         except:
575             raise SubprocessError, ('failure starting ph: %s' %         # ===>
576                                     str(sys.exc_value))
577
578     def query(self, q):
579         """Send a query and return a list of dicts for responses.
580
581         Raise a ValueError if ph responds with an error."""
582
583         self.clear()
584
585         self.proc.writeline('query ' + q)
586         got = []; it = {}
587         while 1:
588             response = self.getreply()  # Should get null on new prompt.
589             errs = self.proc.readPendingErrChars()
590             if errs:
591                 sys.stderr.write(errs)
592             if it:
593                 got.append(it)
594                 it = {}
595             if not response:
596                 return got                                              # ===>
597             elif type(response) == types.StringType:
598                 raise ValueError, "ph failed match: '%s'" % response    # ===>
599             for line in response:
600                 # convert to a dict:
601                 line = string.splitfields(line, ':')
602                 it[string.strip(line[0])] = (
603                     string.strip(string.join(line[1:])))
604         
605     def getreply(self):
606         """Consume next response from ph, returning list of lines or string
607         err."""
608         # Key on first char:  (First line may lack newline.)
609         #  - dash       discard line
610         #  - 'ph> '     conclusion of response
611         #  - number     error message
612         #  - whitespace beginning of next response
613
614         nextChar = self.proc.waitForPendingChar(60)
615         if not nextChar:
616             raise SubprocessError, 'ph subprocess not responding'       # ===>
617         elif nextChar == '-':
618             # dashed line - discard it, and continue reading:
619             self.proc.readline()
620             return self.getreply()                                      # ===>
621         elif nextChar == 'p':
622             # 'ph> ' prompt - don't think we should hit this, but what the hay:
623             return ''                                                   # ===>
624         elif nextChar in '0123456789':
625             # Error notice - we're currently assuming single line errors:
626             return self.proc.readline()[:-1]                            # ===>
627         elif nextChar in ' \t':
628             # Get content, up to next dashed line:
629             got = []
630             while nextChar != '-' and nextChar != '':
631                 got.append(self.proc.readline()[:-1])
632                 nextChar = self.proc.peekPendingChar()
633             return got
634     def __repr__(self):
635         return "<Ph instance, %s at %s>\n" % (self.proc.status(),
636                                              hex(id(self))[2:])
637     def clear(self):
638         """Clear-out initial preface or residual subproc input and output."""
639         pause = .5; maxIter = 10        # 5 seconds to clear
640         iterations = 0
641         got = ''
642         self.proc.write('')
643         while iterations < maxIter:
644             got = got + self.proc.readPendingChars()
645             # Strip out all but the last incomplete line:
646             got = string.splitfields(got, '\n')[-1]
647             if got == 'ph> ': return    # Ok.                             ===>
648             time.sleep(pause)
649         raise SubprocessError, ('ph not responding within %s secs' %
650                                 pause * maxIter)
651
652 #############################################################################
653 #####                             Test                                  #####
654 #############################################################################
655
656 def test(p=0):
657     print "\tOpening subprocess:"
658     p = Subprocess('cat', 1)            # set to expire noisily...
659     print p
660     print "\tOpening bogus subprocess, should fail:"
661     try:
662         b = Subprocess('/', 1)
663         print "\tOops!  Null-named subprocess startup *succeeded*?!?"
664     except SubprocessError:
665         print "\t...yep, it failed."
666     print '\tWrite, then read, two newline-teriminated lines, using readline:'
667     p.write('first full line written\n'); p.write('second.\n')
668     print `p.readline()`
669     print `p.readline()`
670     print '\tThree lines, last sans newline, read using combination:'
671     p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
672     print '\tFirst line via readline:'
673     print `p.readline()`
674     print '\tRest via readPendingChars:'
675     print p.readPendingChars()
676     print "\tStopping then continuing subprocess (verbose):"
677     if not p.stop(1):                   # verbose stop
678         print '\t** Stop seems to have failed!'
679     else:
680         print '\tWriting line while subprocess is paused...'
681         p.write('written while subprocess paused\n')
682         print '\tNonblocking read of paused subprocess (should be empty):'
683         print p.readPendingChars()
684         print '\tContinuing subprocess (verbose):'
685         if not p.cont(1):               # verbose continue
686             print '\t** Continue seems to have failed!  Probly lost subproc...'
687             return p
688         else:
689             print '\tReading accumulated line, blocking read:'
690             print p.readline()
691             print "\tDeleting subproc, which was set to die noisily:"
692             del p
693             print "\tDone."
694             return None