]> git.phdru.name Git - bookmarks_db.git/blob - subproc.py
Minor refactoring
[bookmarks_db.git] / subproc.py
1 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
2
3 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
4 routines.
5
6 Subprocess class features:
7
8  - provides non-blocking stdin and stderr reads
9
10  - provides subprocess stop and continue, kill-on-deletion
11
12  - provides detection of subprocess startup failure
13
14  - Subprocess objects have nice, informative string rep (as every good object
15    ought)."""
16
17 __version__ = "Revision: 1.15 "
18
19 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp 
20 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
21
22 # Prior art: Initially based python code examples demonstrating usage of pipes
23 #            and subprocesses, primarily one by jose pereira.
24
25 # Implementation notes:
26 # - I'm not using the fcntl module to implement non-blocking file descriptors,
27 #   because i don't know what all in it is portable and what is not.  I'm not
28 #   about to provide for different platform contingencies - at that extent, the
29 #   effort would be better spent hacking 'expect' into python.
30 # - Todo? - Incorporate an error-output handler approach, where error output is
31 #           checked on regular IO, when a handler is defined, and passed to the
32 #           handler (eg for printing) immediately as it shows...
33 # - Detection of failed subprocess startup is a gross kludge, at present.
34
35 # - new additions (1.3, 1.4):
36 #  - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
37 #    reads based solely on os.read with select, while capitalizing big-time on
38 #    multi-char read chunking.
39 #  - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
40 #
41 # ken.manheimer@nist.gov
42
43
44 import sys, os, string, time, types
45 import select
46 import signal
47
48
49 SubprocessError = 'SubprocessError'
50 # You may need to increase execvp_grace_seconds, if you have a large or slow
51 # path to search:
52 execvp_grace_seconds = 0.5
53
54 class Subprocess:
55     """Run and communicate asynchronously with a subprocess.
56
57     Provides non-blocking reads in the form of .readPendingChars and
58     .readPendingLine.
59
60     .readline will block until it gets a complete line.
61
62     .peekPendingChar does a non-blocking, non-consuming read for pending
63     output, and can be used before .readline to check non-destructively for
64     pending output.  .waitForPendingChar(timeout, pollPause=.1) blocks until
65     a new character is pending, or timeout secs pass, with granularity of
66     pollPause seconds.
67
68     There are corresponding read and peekPendingErrXXX routines, to read from
69     the subprocess stderr stream."""
70
71     pid = 0
72     cmd = ''
73     expire_noisily = 1                  # Announce subproc destruction?
74     pipefiles = []
75     readbuf = 0                         # fork will assign to be a readbuf obj
76     errbuf = 0                          # fork will assign to be a readbuf obj
77
78     def __init__(self, cmd, control_stderr=0, expire_noisily=0,
79                  in_fd=0, out_fd=1, err_fd=2):
80         """Launch a subprocess, given command string COMMAND."""
81         self.cmd = cmd
82         self.pid = 0
83         self.expire_noisily = expire_noisily
84         self.control_stderr = control_stderr
85         self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
86         self.fork()
87
88     def fork(self, cmd=None):
89         """Fork a subprocess with designated COMMAND (default, self.cmd)."""
90         if cmd: self.cmd = cmd
91         else: cmd = self.cmd
92         cmd = string.split(self.cmd)
93         pRc, cWp = os.pipe()            # parent-read-child, child-write-parent
94         cRp, pWc = os.pipe()            # child-read-parent, parent-write-child
95         pRe, cWe = os.pipe()            # parent-read-error, child-write-error
96         self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
97
98         self.pid = os.fork()
99
100         if self.pid == 0:       #### CHILD ####
101             parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
102             # Reopen stdin, out, err, on pipe ends:
103             os.dup2(cRp, self.in_fd)            # cRp = sys.stdin
104             os.dup2(cWp, self.out_fd)           # cWp = sys.stdout
105             if self.control_stderr:
106                 os.dup2(cWe, self.err_fd)       # cWe = sys.stderr
107             # Ensure (within reason) stray file descriptors are closed:
108             excludes = [self.in_fd, self.out_fd, self.err_fd]
109             for i in range(4,100):
110                 if i not in excludes:
111                     try: os.close(i)
112                     except os.error: pass
113
114             try:
115                 os.execvp(cmd[0], cmd)
116                 os._exit(1)                     # Shouldn't get here
117
118             except os.error, e:
119                 if self.control_stderr:
120                     os.dup2(parentErr, 2)       # Reconnect to parent's stdout
121                 sys.stderr.write("**execvp failed, '%s'**\n" %
122                                  str(e))
123                 os._exit(1)
124             os._exit(1)                 # Shouldn't get here.
125
126         else:           ### PARENT ###
127             # Connect to the child's file descriptors, using our customized
128             # fdopen:
129             self.toChild = os.fdopen(pWc, 'w')
130             self.toChild_fdlist = [pWc]
131             self.readbuf = ReadBuf(pRc)
132             self.errbuf = ReadBuf(pRe)
133             time.sleep(execvp_grace_seconds)
134             try:
135                 pid, err = os.waitpid(self.pid, os.WNOHANG)
136             except os.error, (errno, msg):
137                 if errno == 10:
138                     raise SubprocessError, \
139                           "Subprocess '%s' failed." % self.cmd
140                 raise SubprocessError, \
141                       "Subprocess failed[%d]: %s" % (errno, msg)
142             if pid == self.pid:
143                 # child exited already
144                 self.pid == None
145                 sig = err & 0xff
146                 rc = (err & 0xff00) >> 8
147                 if sig:
148                     raise SubprocessError, (
149                     "child killed by signal %d with a return code of %d"
150                     % (sig, rc))
151                 if rc:
152                     raise SubprocessError, \
153                           "child exited with return code %d" % rc
154                 # Child may have exited, but not in error, so we won't say
155                 # anything more at this point.
156
157     ### Write input to subprocess ###
158
159     def write(self, str):
160         """Write a STRING to the subprocess."""
161
162         if not self.pid:
163             raise SubprocessError, "no child"                           # ===>
164         if select.select([],self.toChild_fdlist,[],0)[1]:
165             self.toChild.write(str)
166             self.toChild.flush()
167         else:
168             # XXX Can write-buffer full be handled better??
169             raise IOError, "write to %s blocked" % self                 # ===>
170
171     def writeline(self, line=''):
172         """Write STRING, with added newline termination, to subprocess."""
173         self.write(line + '\n')
174
175     ### Get output from subprocess ###
176
177     def peekPendingChar(self):
178         """Return, but (effectively) do not consume a single pending output
179         char, or return null string if none pending."""
180
181         return self.readbuf.peekPendingChar()                           # ===>
182     def peekPendingErrChar(self):
183         """Return, but (effectively) do not consume a single pending output
184         char, or return null string if none pending."""
185
186         return self.errbuf.peekPendingChar()                            # ===>
187
188     def waitForPendingChar(self, timeout, pollPause=.1):
189         """Block max TIMEOUT secs until we peek a pending char, returning the
190         char, or '' if none encountered.
191
192         Pause POLLPAUSE secs (default .1) between polls."""
193
194         accume = 0
195         while 1:
196             nextChar = self.readbuf.peekPendingChar()
197             if nextChar or (accume > timeout): return nextChar
198             time.sleep(pollPause)
199             accume = accume + pollPause
200
201     def read(self, n=None):
202         """Read N chars, or all pending if no N specified."""
203         if n == None:
204             return self.readPendingChars()
205         got = ''
206         while n:
207             got0 = self.readPendingChars(n)
208             got = got + got0
209             n = n - len(got0)
210         return got      
211     def readPendingChars(self, max=None):
212         """Read all currently pending subprocess output as a single string."""
213         return self.readbuf.readPendingChars(max)
214     def readPendingErrChars(self):
215         """Read all currently pending subprocess error output as a single
216         string."""
217         if self.control_stderr:
218             return self.errbuf.readPendingChars()
219         else:
220             raise SubprocessError, "Haven't grabbed subprocess error stream."
221
222     def readPendingLine(self):
223         """Read currently pending subprocess output, up to a complete line
224         (newline inclusive)."""
225         return self.readbuf.readPendingLine()
226     def readPendingErrLine(self):
227         """Read currently pending subprocess error output, up to a complete
228         line (newline inclusive)."""
229         if self.control_stderr:
230             return self.errbuf.readPendingLine()
231         else:
232             raise SubprocessError, "Haven't grabbed subprocess error stream."
233
234     def readline(self):
235         """Return next complete line of subprocess output, blocking until
236         then."""
237         return self.readbuf.readline()
238     def readlineErr(self):
239         """Return next complete line of subprocess error output, blocking until
240         then."""
241         if self.control_stderr:
242             return self.errbuf.readline()
243         else:
244             raise SubprocessError, "Haven't grabbed subprocess error stream."
245
246     ### Subprocess Control ###
247
248     def active(self):
249         """True if subprocess is alive and kicking."""
250         return self.status(boolean=1)
251     def status(self, boolean=0):
252         """Return string indicating whether process is alive or dead."""
253         active = 0
254         if not self.cmd:
255             status = 'sans command'
256         elif not self.pid:
257             status = 'sans process'
258         elif not self.cont():
259             status = "(unresponding) '%s'" % self.cmd
260         else:
261             status = "'%s'" % self.cmd
262             active = 1
263         if boolean:
264             return active
265         else:
266             return status
267
268     def stop(self, verbose=1):
269         """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
270         otherwise."""
271         try:
272             os.kill(self.pid, signal.SIGSTOP)
273         except os.error:
274             if verbose:
275                 print "Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value)
276             return 0
277         if verbose: print "Stopped '%s'" % self.cmd
278         return 'stopped'
279
280     def cont(self, verbose=0):
281         """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
282         otherwise."""
283         try:
284             os.kill(self.pid, signal.SIGCONT)
285         except os.error:
286             if verbose:
287                 print ("Continue failed for '%s' - '%s'" %
288                        (self.cmd, sys.exc_value))
289             return 0
290         if verbose: print "Continued '%s'" % self.cmd
291         return 'continued'
292
293     def die(self):
294         """Send process PID signal SIG (default 9, 'kill'), returning None once
295          it is successfully reaped.
296
297         SubprocessError is raised if process is not successfully killed."""
298
299         if not self.pid:
300             raise SubprocessError, "No process"                         # ===>
301         elif not self.cont():
302             raise SubprocessError, "Can't signal subproc %s" % self     # ===>
303
304         # Try sending first a TERM and then a KILL signal.
305         keep_trying = 1
306         sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
307         for sig in sigs:
308             try:
309                 os.kill(self.pid, sig[1])
310             except posix.error:
311                 # keep trying
312                 pass
313             # Try a couple or three times to reap the process with waitpid:
314             for i in range(5):
315                 # WNOHANG == 1 on sunos, presumably same elsewhere.
316                 if os.waitpid(self.pid, os.WNOHANG):
317                     if self.expire_noisily:
318                         print ("\n(%s subproc %d '%s' / %s)" %
319                                (sig[0], self.pid, self.cmd,
320                                 hex(id(self))[2:]))
321                     for i in self.pipefiles:
322                         os.close(i)
323                     self.pid = 0
324                     return None                                         # ===>
325                 time.sleep(.1)
326         # Only got here if subprocess is not gone:
327         raise (SubprocessError,
328                ("Failed kill of subproc %d, '%s', with signals %s" %
329                 (self.pid, self.cmd, map(lambda(x): x[0], sigs))))
330
331     def __del__(self):
332         """Terminate the subprocess"""
333         if self.pid:
334             self.die()
335
336     def __repr__(self):
337         status = self.status()
338         return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
339
340 #############################################################################
341 #####                 Non-blocking read operations                      #####
342 #############################################################################
343
344 class ReadBuf:
345     """Output buffer for non-blocking reads on selectable files like pipes and
346     sockets.  Init with a file descriptor for the file."""
347
348     def __init__(self, fd, maxChunkSize=1024):
349         """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
350         (default 1024)."""
351
352         if fd < 0:
353             raise ValueError
354         self.fd = fd
355         self.eof = 0                    # May be set with stuff still in .buf
356         self.buf = ''
357         self.chunkSize = maxChunkSize   # Biggest read chunk, default 1024.
358
359     def fileno(self):
360         return self.fd
361
362     def peekPendingChar(self):
363         """Return, but don't consume, first character of unconsumed output from
364         file, or empty string if none."""
365
366         if self.buf:
367             return self.buf[0]                                          # ===>
368
369         if self.eof:
370             return ''                                                   # ===>
371
372         sel = select.select([self.fd], [], [self.fd], 0)
373         if sel[2]:
374             self.eof = 1
375         if sel[0]:
376             self.buf = os.read(self.fd, self.chunkSize)                 # ===>
377             return self.buf[0]          # Assume select don't lie.
378         else: return ''                                                 # ===>
379
380
381     def readPendingChar(self):
382         """Consume first character of unconsumed output from file, or empty
383         string if none."""
384
385         if self.buf:
386             got, self.buf = self.buf[0], self.buf[1:]
387             return got                                                  # ===>
388
389         if self.eof:
390             return ''                                                   # ===>
391
392         sel = select.select([self.fd], [], [self.fd], 0)
393         if sel[2]:
394             self.eof = 1
395         if sel[0]:
396             return os.read(self.fd, 1)                                  # ===>
397         else: return ''                                                 # ===>
398
399     def readPendingChars(self, max=None):
400         """Consume uncomsumed output from FILE, or empty string if nothing
401         pending."""
402
403         got = ""
404         if self.buf:
405              if (max > 0) and (len(self.buf) > max):
406                  got = self.buf[0:max]
407                  self.buf = self.buf[max:]
408              else:
409                  got, self.buf = self.buf, ''
410              return got                                         
411
412         if self.eof:
413              return ''
414
415         sel = select.select([self.fd], [], [self.fd], 0)
416         if sel[2]:
417             self.eof = 1
418         if sel[0]:
419             got = got + os.read(self.fd, self.chunkSize)
420             if max == 0:
421                 self.buf = got
422                 return ''
423             elif max == None:
424                 return got
425             elif len(got) > max:
426                 self.buf = self.buf + got[max:]
427                 return got[:max]
428             else:
429                 return got
430         else: return ''
431
432     def readPendingLine(self, block=0):
433         """Return pending output from FILE, up to first newline (inclusive).
434
435         Does not block unless optional arg BLOCK is true.
436
437         Note that an error will be raised if a new eof is encountered without
438         any newline."""
439
440         if self.buf:
441             to = string.find(self.buf, '\n')
442             if to != -1:
443                 got, self.buf = self.buf[:to+1], self.buf[to+1:]
444                 return got                                              # ===>
445             got, self.buf = self.buf, ''
446         else:
447             if self.eof:
448                 return ''                                               # ===>
449             got = ''
450
451         # Herein, 'got' contains the (former) contents of the buffer, and it
452         # doesn't include a newline.
453         fdlist = [self.fd]
454         period = block and 1.0 or 0     # don't be too busy while waiting
455         while 1:                        # (we'll only loop if block set)
456             sel = select.select(fdlist, [], fdlist, period)
457             if sel[2]:
458                 self.eof = 1
459             if sel[0]:
460                 got = got + os.read(self.fd, self.chunkSize)
461
462             to = string.find(got, '\n')
463             if to != -1:
464                 got, self.buf = got[:to+1], got[to+1:]
465                 return got                                              # ===>
466             if not block:
467                 return got                                              # ===>
468             if self.eof:
469                 self.buf = ''           # this is how an ordinary file acts...
470                 return got
471             # otherwise - no newline, blocking requested, no eof - loop. # ==^
472
473     def readline(self):
474         """Return next output line from file, blocking until it is received."""
475
476         return self.readPendingLine(1)                                  # ===>
477
478
479 #############################################################################
480 #####                 Encapsulated reading and writing                  #####
481 #############################################################################
482 # Encapsulate messages so the end can be unambiguously identified, even
483 # when they contain multiple, possibly empty lines.
484
485 class RecordFile:
486     """Encapsulate stream object for record-oriented IO.
487
488     Particularly useful when dealing with non-line oriented communications
489     over pipes, eg with subprocesses."""
490
491     # Message is written preceded by a line containing the message length.
492
493     def __init__(self, f):
494         self.file = f
495
496     def write_record(self, s):
497         "Write so self.read knows exactly how much to read."
498         f = self.__dict__['file']
499         f.write("%s\n%s" % (len(s), s))
500         if hasattr(f, 'flush'):
501             f.flush()
502
503     def read_record(self):
504         "Read and reconstruct message as prepared by self.write."
505         f = self.__dict__['file']
506         line = f.readline()[:-1]
507         if line:
508             try:
509                 l = string.atoi(line)
510             except ValueError:
511                 raise IOError, ("corrupt %s file structure"
512                                 % self.__class__.__name__)
513             return f.read(l)
514         else:
515             # EOF.
516             return ''
517
518     def __getattr__(self, attr):
519         """Implement characteristic file object attributes."""
520         f = self.__dict__['file']
521         if hasattr(f, attr):
522             return getattr(f, attr)
523         else:
524             raise AttributeError, attr
525
526     def __repr__(self):
527         return "<%s of %s at %s>" % (self.__class__.__name__,
528                                      self.__dict__['file'],
529                                      hex(id(self))[2:])
530
531 def record_trial(s):
532     """Exercise encapsulated write/read with an arbitrary string.
533
534     Raise IOError if the string gets distorted through transmission!"""
535     from StringIO import StringIO
536     sf = StringIO()
537     c = RecordFile(sf)
538     c.write(s)
539     c.seek(0)
540     r = c.read()
541     show = " start:\t %s\n end:\t %s\n" % (`s`, `r`)
542     if r != s:
543         raise IOError, "String distorted:\n%s" % show
544
545 #############################################################################
546 #####                   An example subprocess interfaces                #####
547 #############################################################################
548
549 class Ph:
550     """Convenient interface to CCSO 'ph' nameserver subprocess.
551
552     .query('string...') method takes a query and returns a list of dicts, each
553     of which represents one entry."""
554
555     # Note that i made this a class that handles a subprocess object, rather
556     # than one that inherits from it.  I didn't see any functional
557     # disadvantages, and didn't think that full support of the entire
558     # Subprocess functionality was in any way suitable for interaction with
559     # this specialized interface.  ?  klm 13-Jan-1995
560
561     def __init__(self):
562         try:
563             self.proc = Subprocess('ph', 1)
564         except:
565             raise SubprocessError, ('failure starting ph: %s' %         # ===>
566                                     str(sys.exc_value))
567
568     def query(self, q):
569         """Send a query and return a list of dicts for responses.
570
571         Raise a ValueError if ph responds with an error."""
572
573         self.clear()
574
575         self.proc.writeline('query ' + q)
576         got = []; it = {}
577         while 1:
578             response = self.getreply()  # Should get null on new prompt.
579             errs = self.proc.readPendingErrChars()
580             if errs:
581                 sys.stderr.write(errs)
582             if it:
583                 got.append(it)
584                 it = {}
585             if not response:
586                 return got                                              # ===>
587             elif type(response) == types.StringType:
588                 raise ValueError, "ph failed match: '%s'" % response    # ===>
589             for line in response:
590                 # convert to a dict:
591                 line = string.splitfields(line, ':')
592                 it[string.strip(line[0])] = (
593                     string.strip(string.join(line[1:])))
594         
595     def getreply(self):
596         """Consume next response from ph, returning list of lines or string
597         err."""
598         # Key on first char:  (First line may lack newline.)
599         #  - dash       discard line
600         #  - 'ph> '     conclusion of response
601         #  - number     error message
602         #  - whitespace beginning of next response
603
604         nextChar = self.proc.waitForPendingChar(60)
605         if not nextChar:
606             raise SubprocessError, 'ph subprocess not responding'       # ===>
607         elif nextChar == '-':
608             # dashed line - discard it, and continue reading:
609             self.proc.readline()
610             return self.getreply()                                      # ===>
611         elif nextChar == 'p':
612             # 'ph> ' prompt - don't think we should hit this, but what the hay:
613             return ''                                                   # ===>
614         elif nextChar in '0123456789':
615             # Error notice - we're currently assuming single line errors:
616             return self.proc.readline()[:-1]                            # ===>
617         elif nextChar in ' \t':
618             # Get content, up to next dashed line:
619             got = []
620             while nextChar != '-' and nextChar != '':
621                 got.append(self.proc.readline()[:-1])
622                 nextChar = self.proc.peekPendingChar()
623             return got
624     def __repr__(self):
625         return "<Ph instance, %s at %s>\n" % (self.proc.status(),
626                                              hex(id(self))[2:])
627     def clear(self):
628         """Clear-out initial preface or residual subproc input and output."""
629         pause = .5; maxIter = 10        # 5 seconds to clear
630         iterations = 0
631         got = ''
632         self.proc.write('')
633         while iterations < maxIter:
634             got = got + self.proc.readPendingChars()
635             # Strip out all but the last incomplete line:
636             got = string.splitfields(got, '\n')[-1]
637             if got == 'ph> ': return    # Ok.                             ===>
638             time.sleep(pause)
639         raise SubprocessError, ('ph not responding within %s secs' %
640                                 pause * maxIter)
641
642 #############################################################################
643 #####                             Test                                  #####
644 #############################################################################
645
646 def test(p=0):
647     print "\tOpening subprocess:"
648     p = Subprocess('cat', 1)            # set to expire noisily...
649     print p
650     print "\tOpening bogus subprocess, should fail:"
651     try:
652         b = Subprocess('/', 1)
653         print "\tOops!  Null-named subprocess startup *succeeded*?!?"
654     except SubprocessError:
655         print "\t...yep, it failed."
656     print '\tWrite, then read, two newline-teriminated lines, using readline:'
657     p.write('first full line written\n'); p.write('second.\n')
658     print `p.readline()`
659     print `p.readline()`
660     print '\tThree lines, last sans newline, read using combination:'
661     p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
662     print '\tFirst line via readline:'
663     print `p.readline()`
664     print '\tRest via readPendingChars:'
665     print p.readPendingChars()
666     print "\tStopping then continuing subprocess (verbose):"
667     if not p.stop(1):                   # verbose stop
668         print '\t** Stop seems to have failed!'
669     else:
670         print '\tWriting line while subprocess is paused...'
671         p.write('written while subprocess paused\n')
672         print '\tNonblocking read of paused subprocess (should be empty):'
673         print p.readPendingChars()
674         print '\tContinuing subprocess (verbose):'
675         if not p.cont(1):               # verbose continue
676             print '\t** Continue seems to have failed!  Probly lost subproc...'
677             return p
678         else:
679             print '\tReading accumulated line, blocking read:'
680             print p.readline()
681             print "\tDeleting subproc, which was set to die noisily:"
682             del p
683             print "\tDone."
684             return None