]> git.phdru.name Git - bookmarks_db.git/blob - subproc.py
a0099c290db088a168181802ee91153ac2b920c9
[bookmarks_db.git] / subproc.py
1 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
2
3 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
4 routines.
5
6 Subprocess class features:
7
8  - provides non-blocking stdin and stderr reads
9
10  - provides subprocess stop and continue, kill-on-deletion
11
12  - provides detection of subprocess startup failure
13
14  - Subprocess objects have nice, informative string rep (as every good object
15    ought)."""
16
17 __version__ = "Revision: 1.15 "
18
19 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp 
20 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
21
22 # Prior art: Initially based python code examples demonstrating usage of pipes
23 #            and subprocesses, primarily one by jose pereira.
24
25 # Implementation notes:
26 # - I'm not using the fcntl module to implement non-blocking file descriptors,
27 #   because i don't know what all in it is portable and what is not.  I'm not
28 #   about to provide for different platform contingencies - at that extent, the
29 #   effort would be better spent hacking 'expect' into python.
30 # - Todo? - Incorporate an error-output handler approach, where error output is
31 #           checked on regular IO, when a handler is defined, and passed to the
32 #           handler (eg for printing) immediately as it shows...
33 # - Detection of failed subprocess startup is a gross kludge, at present.
34
35 # - new additions (1.3, 1.4):
36 #  - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
37 #    reads based solely on os.read with select, while capitalizing big-time on
38 #    multi-char read chunking.
39 #  - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
40 #
41 # ken.manheimer@nist.gov
42
43
44 import sys, os, string, time, types
45 import select
46 import signal
47
48
49 SubprocessError = 'SubprocessError'
50 # You may need to increase execvp_grace_seconds, if you have a large or slow
51 # path to search:
52 execvp_grace_seconds = 0.5
53
54 class Subprocess:
55     """Run and communicate asynchronously with a subprocess.
56
57     Provides non-blocking reads in the form of .readPendingChars and
58     .readPendingLine.
59
60     .readline will block until it gets a complete line.
61
62     .peekPendingChar does a non-blocking, non-consuming read for pending
63     output, and can be used before .readline to check non-destructively for
64     pending output.  .waitForPendingChar(timeout, pollPause=.1) blocks until
65     a new character is pending, or timeout secs pass, with granularity of
66     pollPause seconds.
67
68     There are corresponding read and peekPendingErrXXX routines, to read from
69     the subprocess stderr stream."""
70
71     pid = 0
72     cmd = ''
73     expire_noisily = 1                  # Announce subproc destruction?
74     pipefiles = []
75     readbuf = 0                         # fork will assign to be a readbuf obj
76     errbuf = 0                          # fork will assign to be a readbuf obj
77
78     def __init__(self, cmd, control_stderr=0, expire_noisily=0,
79                  in_fd=0, out_fd=1, err_fd=2):
80         """Launch a subprocess, given command string COMMAND."""
81         self.cmd = cmd
82         self.pid = 0
83         self.expire_noisily = expire_noisily
84         self.control_stderr = control_stderr
85         self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
86         self.fork()
87
88     def fork(self, cmd=None):
89         """Fork a subprocess with designated COMMAND (default, self.cmd)."""
90         if cmd: self.cmd = cmd
91         else: cmd = self.cmd
92         cmd = string.split(self.cmd)
93         pRc, cWp = os.pipe()            # parent-read-child, child-write-parent
94         cRp, pWc = os.pipe()            # child-read-parent, parent-write-child
95         pRe, cWe = os.pipe()            # parent-read-error, child-write-error
96         self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
97
98         self.pid = os.fork()
99
100         if self.pid == 0:       #### CHILD ####
101             parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
102             # Reopen stdin, out, err, on pipe ends:
103             os.dup2(cRp, self.in_fd)            # cRp = sys.stdin
104             os.dup2(cWp, self.out_fd)           # cWp = sys.stdout
105             if self.control_stderr:
106                 os.dup2(cWe, self.err_fd)       # cWe = sys.stderr
107             # Ensure (within reason) stray file descriptors are closed:
108             excludes = [self.in_fd, self.out_fd, self.err_fd]
109             for i in range(4,100):
110                 if i not in excludes:
111                     try: os.close(i)
112                     except os.error: pass
113
114             try:
115                 os.execvp(cmd[0], cmd)
116                 os._exit(1)                     # Shouldn't get here
117
118             except os.error, e:
119                 if self.control_stderr:
120                     os.dup2(parentErr, 2)       # Reconnect to parent's stdout
121                 sys.stderr.write("**execvp failed, '%s'**\n" %
122                                  str(e))
123                 os._exit(1)
124             os._exit(1)                 # Shouldn't get here.
125
126         else:           ### PARENT ###
127             # Connect to the child's file descriptors, using our customized
128             # fdopen:
129             self.toChild = os.fdopen(pWc, 'w')
130             self.toChild_fdlist = [pWc]
131             self.readbuf = ReadBuf(pRc)
132             self.errbuf = ReadBuf(pRe)
133             time.sleep(execvp_grace_seconds)
134             try:
135                 pid, err = os.waitpid(self.pid, os.WNOHANG)
136             except os.error, (errno, msg):
137                 if errno == 10:
138                     raise SubprocessError("Subprocess '%s' failed." % self.cmd)
139                 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
140             if pid == self.pid:
141                 # child exited already
142                 self.pid == None
143                 sig = err & 0xff
144                 rc = (err & 0xff00) >> 8
145                 if sig:
146                     raise SubprocessError(
147                     "child killed by signal %d with a return code of %d"
148                     % (sig, rc))
149                 if rc:
150                     raise SubprocessError(
151                           "child exited with return code %d" % rc)
152                 # Child may have exited, but not in error, so we won't say
153                 # anything more at this point.
154
155     ### Write input to subprocess ###
156
157     def write(self, str):
158         """Write a STRING to the subprocess."""
159
160         if not self.pid:
161             raise SubprocessError("no child")                           # ===>
162         if select.select([],self.toChild_fdlist,[],0)[1]:
163             self.toChild.write(str)
164             self.toChild.flush()
165         else:
166             # XXX Can write-buffer full be handled better??
167             raise IOError("write to %s blocked" % self)                 # ===>
168
169     def writeline(self, line=''):
170         """Write STRING, with added newline termination, to subprocess."""
171         self.write(line + '\n')
172
173     ### Get output from subprocess ###
174
175     def peekPendingChar(self):
176         """Return, but (effectively) do not consume a single pending output
177         char, or return null string if none pending."""
178
179         return self.readbuf.peekPendingChar()                           # ===>
180     def peekPendingErrChar(self):
181         """Return, but (effectively) do not consume a single pending output
182         char, or return null string if none pending."""
183
184         return self.errbuf.peekPendingChar()                            # ===>
185
186     def waitForPendingChar(self, timeout, pollPause=.1):
187         """Block max TIMEOUT secs until we peek a pending char, returning the
188         char, or '' if none encountered.
189
190         Pause POLLPAUSE secs (default .1) between polls."""
191
192         accume = 0
193         while 1:
194             nextChar = self.readbuf.peekPendingChar()
195             if nextChar or (accume > timeout): return nextChar
196             time.sleep(pollPause)
197             accume = accume + pollPause
198
199     def read(self, n=None):
200         """Read N chars, or all pending if no N specified."""
201         if n == None:
202             return self.readPendingChars()
203         got = ''
204         while n:
205             got0 = self.readPendingChars(n)
206             got = got + got0
207             n = n - len(got0)
208         return got      
209     def readPendingChars(self, max=None):
210         """Read all currently pending subprocess output as a single string."""
211         return self.readbuf.readPendingChars(max)
212     def readPendingErrChars(self):
213         """Read all currently pending subprocess error output as a single
214         string."""
215         if self.control_stderr:
216             return self.errbuf.readPendingChars()
217         else:
218             raise SubprocessError("Haven't grabbed subprocess error stream.")
219
220     def readPendingLine(self):
221         """Read currently pending subprocess output, up to a complete line
222         (newline inclusive)."""
223         return self.readbuf.readPendingLine()
224     def readPendingErrLine(self):
225         """Read currently pending subprocess error output, up to a complete
226         line (newline inclusive)."""
227         if self.control_stderr:
228             return self.errbuf.readPendingLine()
229         else:
230             raise SubprocessError("Haven't grabbed subprocess error stream.")
231
232     def readline(self):
233         """Return next complete line of subprocess output, blocking until
234         then."""
235         return self.readbuf.readline()
236     def readlineErr(self):
237         """Return next complete line of subprocess error output, blocking until
238         then."""
239         if self.control_stderr:
240             return self.errbuf.readline()
241         else:
242             raise SubprocessError("Haven't grabbed subprocess error stream.")
243
244     ### Subprocess Control ###
245
246     def active(self):
247         """True if subprocess is alive and kicking."""
248         return self.status(boolean=1)
249     def status(self, boolean=0):
250         """Return string indicating whether process is alive or dead."""
251         active = 0
252         if not self.cmd:
253             status = 'sans command'
254         elif not self.pid:
255             status = 'sans process'
256         elif not self.cont():
257             status = "(unresponding) '%s'" % self.cmd
258         else:
259             status = "'%s'" % self.cmd
260             active = 1
261         if boolean:
262             return active
263         else:
264             return status
265
266     def stop(self, verbose=1):
267         """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
268         otherwise."""
269         try:
270             os.kill(self.pid, signal.SIGSTOP)
271         except os.error:
272             if verbose:
273                 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
274             return 0
275         if verbose: print("Stopped '%s'" % self.cmd)
276         return 'stopped'
277
278     def cont(self, verbose=0):
279         """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
280         otherwise."""
281         try:
282             os.kill(self.pid, signal.SIGCONT)
283         except os.error:
284             if verbose:
285                 print(("Continue failed for '%s' - '%s'" %
286                        (self.cmd, sys.exc_value)))
287             return 0
288         if verbose: print("Continued '%s'" % self.cmd)
289         return 'continued'
290
291     def die(self):
292         """Send process PID signal SIG (default 9, 'kill'), returning None once
293          it is successfully reaped.
294
295         SubprocessError is raised if process is not successfully killed."""
296
297         if not self.pid:
298             raise SubprocessError("No process")                         # ===>
299         elif not self.cont():
300             raise SubprocessError("Can't signal subproc %s" % self)     # ===>
301
302         # Try sending first a TERM and then a KILL signal.
303         keep_trying = 1
304         sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
305         for sig in sigs:
306             try:
307                 os.kill(self.pid, sig[1])
308             except posix.error:
309                 # keep trying
310                 pass
311             # Try a couple or three times to reap the process with waitpid:
312             for i in range(5):
313                 # WNOHANG == 1 on sunos, presumably same elsewhere.
314                 if os.waitpid(self.pid, os.WNOHANG):
315                     if self.expire_noisily:
316                         print(("\n(%s subproc %d '%s' / %s)" %
317                                (sig[0], self.pid, self.cmd,
318                                 hex(id(self))[2:])))
319                     for i in self.pipefiles:
320                         os.close(i)
321                     self.pid = 0
322                     return None                                         # ===>
323                 time.sleep(.1)
324         # Only got here if subprocess is not gone:
325         raise SubprocessError(
326                "Failed kill of subproc %d, '%s', with signals %s" %
327                 (self.pid, self.cmd, map(lambda(x): x[0], sigs)))
328
329     def __del__(self):
330         """Terminate the subprocess"""
331         if self.pid:
332             self.die()
333
334     def __repr__(self):
335         status = self.status()
336         return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
337
338 #############################################################################
339 #####                 Non-blocking read operations                      #####
340 #############################################################################
341
342 class ReadBuf:
343     """Output buffer for non-blocking reads on selectable files like pipes and
344     sockets.  Init with a file descriptor for the file."""
345
346     def __init__(self, fd, maxChunkSize=1024):
347         """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
348         (default 1024)."""
349
350         if fd < 0:
351             raise ValueError
352         self.fd = fd
353         self.eof = 0                    # May be set with stuff still in .buf
354         self.buf = ''
355         self.chunkSize = maxChunkSize   # Biggest read chunk, default 1024.
356
357     def fileno(self):
358         return self.fd
359
360     def peekPendingChar(self):
361         """Return, but don't consume, first character of unconsumed output from
362         file, or empty string if none."""
363
364         if self.buf:
365             return self.buf[0]                                          # ===>
366
367         if self.eof:
368             return ''                                                   # ===>
369
370         sel = select.select([self.fd], [], [self.fd], 0)
371         if sel[2]:
372             self.eof = 1
373         if sel[0]:
374             self.buf = os.read(self.fd, self.chunkSize)                 # ===>
375             return self.buf[0]          # Assume select don't lie.
376         else: return ''                                                 # ===>
377
378
379     def readPendingChar(self):
380         """Consume first character of unconsumed output from file, or empty
381         string if none."""
382
383         if self.buf:
384             got, self.buf = self.buf[0], self.buf[1:]
385             return got                                                  # ===>
386
387         if self.eof:
388             return ''                                                   # ===>
389
390         sel = select.select([self.fd], [], [self.fd], 0)
391         if sel[2]:
392             self.eof = 1
393         if sel[0]:
394             return os.read(self.fd, 1)                                  # ===>
395         else: return ''                                                 # ===>
396
397     def readPendingChars(self, max=None):
398         """Consume uncomsumed output from FILE, or empty string if nothing
399         pending."""
400
401         got = ""
402         if self.buf:
403              if (max > 0) and (len(self.buf) > max):
404                  got = self.buf[0:max]
405                  self.buf = self.buf[max:]
406              else:
407                  got, self.buf = self.buf, ''
408              return got                                         
409
410         if self.eof:
411              return ''
412
413         sel = select.select([self.fd], [], [self.fd], 0)
414         if sel[2]:
415             self.eof = 1
416         if sel[0]:
417             got = got + os.read(self.fd, self.chunkSize)
418             if max == 0:
419                 self.buf = got
420                 return ''
421             elif max == None:
422                 return got
423             elif len(got) > max:
424                 self.buf = self.buf + got[max:]
425                 return got[:max]
426             else:
427                 return got
428         else: return ''
429
430     def readPendingLine(self, block=0):
431         """Return pending output from FILE, up to first newline (inclusive).
432
433         Does not block unless optional arg BLOCK is true.
434
435         Note that an error will be raised if a new eof is encountered without
436         any newline."""
437
438         if self.buf:
439             to = string.find(self.buf, '\n')
440             if to != -1:
441                 got, self.buf = self.buf[:to+1], self.buf[to+1:]
442                 return got                                              # ===>
443             got, self.buf = self.buf, ''
444         else:
445             if self.eof:
446                 return ''                                               # ===>
447             got = ''
448
449         # Herein, 'got' contains the (former) contents of the buffer, and it
450         # doesn't include a newline.
451         fdlist = [self.fd]
452         period = block and 1.0 or 0     # don't be too busy while waiting
453         while 1:                        # (we'll only loop if block set)
454             sel = select.select(fdlist, [], fdlist, period)
455             if sel[2]:
456                 self.eof = 1
457             if sel[0]:
458                 got = got + os.read(self.fd, self.chunkSize)
459
460             to = string.find(got, '\n')
461             if to != -1:
462                 got, self.buf = got[:to+1], got[to+1:]
463                 return got                                              # ===>
464             if not block:
465                 return got                                              # ===>
466             if self.eof:
467                 self.buf = ''           # this is how an ordinary file acts...
468                 return got
469             # otherwise - no newline, blocking requested, no eof - loop. # ==^
470
471     def readline(self):
472         """Return next output line from file, blocking until it is received."""
473
474         return self.readPendingLine(1)                                  # ===>
475
476
477 #############################################################################
478 #####                 Encapsulated reading and writing                  #####
479 #############################################################################
480 # Encapsulate messages so the end can be unambiguously identified, even
481 # when they contain multiple, possibly empty lines.
482
483 class RecordFile:
484     """Encapsulate stream object for record-oriented IO.
485
486     Particularly useful when dealing with non-line oriented communications
487     over pipes, eg with subprocesses."""
488
489     # Message is written preceded by a line containing the message length.
490
491     def __init__(self, f):
492         self.file = f
493
494     def write_record(self, s):
495         "Write so self.read knows exactly how much to read."
496         f = self.__dict__['file']
497         f.write("%s\n%s" % (len(s), s))
498         if hasattr(f, 'flush'):
499             f.flush()
500
501     def read_record(self):
502         "Read and reconstruct message as prepared by self.write."
503         f = self.__dict__['file']
504         line = f.readline()[:-1]
505         if line:
506             try:
507                 l = string.atoi(line)
508             except ValueError:
509                 raise IOError(("corrupt %s file structure"
510                                 % self.__class__.__name__))
511             return f.read(l)
512         else:
513             # EOF.
514             return ''
515
516     def __getattr__(self, attr):
517         """Implement characteristic file object attributes."""
518         f = self.__dict__['file']
519         if hasattr(f, attr):
520             return getattr(f, attr)
521         else:
522             raise AttributeError(attr)
523
524     def __repr__(self):
525         return "<%s of %s at %s>" % (self.__class__.__name__,
526                                      self.__dict__['file'],
527                                      hex(id(self))[2:])
528
529 def record_trial(s):
530     """Exercise encapsulated write/read with an arbitrary string.
531
532     Raise IOError if the string gets distorted through transmission!"""
533     from StringIO import StringIO
534     sf = StringIO()
535     c = RecordFile(sf)
536     c.write(s)
537     c.seek(0)
538     r = c.read()
539     show = " start:\t %s\n end:\t %s\n" % (`s`, `r`)
540     if r != s:
541         raise IOError("String distorted:\n%s" % show)
542
543 #############################################################################
544 #####                   An example subprocess interfaces                #####
545 #############################################################################
546
547 class Ph:
548     """Convenient interface to CCSO 'ph' nameserver subprocess.
549
550     .query('string...') method takes a query and returns a list of dicts, each
551     of which represents one entry."""
552
553     # Note that i made this a class that handles a subprocess object, rather
554     # than one that inherits from it.  I didn't see any functional
555     # disadvantages, and didn't think that full support of the entire
556     # Subprocess functionality was in any way suitable for interaction with
557     # this specialized interface.  ?  klm 13-Jan-1995
558
559     def __init__(self):
560         try:
561             self.proc = Subprocess('ph', 1)
562         except:
563             raise SubprocessError('failure starting ph: %s' %         # ===>
564                                     str(sys.exc_value))
565
566     def query(self, q):
567         """Send a query and return a list of dicts for responses.
568
569         Raise a ValueError if ph responds with an error."""
570
571         self.clear()
572
573         self.proc.writeline('query ' + q)
574         got = []; it = {}
575         while 1:
576             response = self.getreply()  # Should get null on new prompt.
577             errs = self.proc.readPendingErrChars()
578             if errs:
579                 sys.stderr.write(errs)
580             if it:
581                 got.append(it)
582                 it = {}
583             if not response:
584                 return got                                              # ===>
585             elif type(response) == types.StringType:
586                 raise ValueError("ph failed match: '%s'" % response)    # ===>
587             for line in response:
588                 # convert to a dict:
589                 line = string.splitfields(line, ':')
590                 it[string.strip(line[0])] = (
591                     string.strip(string.join(line[1:])))
592         
593     def getreply(self):
594         """Consume next response from ph, returning list of lines or string
595         err."""
596         # Key on first char:  (First line may lack newline.)
597         #  - dash       discard line
598         #  - 'ph> '     conclusion of response
599         #  - number     error message
600         #  - whitespace beginning of next response
601
602         nextChar = self.proc.waitForPendingChar(60)
603         if not nextChar:
604             raise SubprocessError('ph subprocess not responding')       # ===>
605         elif nextChar == '-':
606             # dashed line - discard it, and continue reading:
607             self.proc.readline()
608             return self.getreply()                                      # ===>
609         elif nextChar == 'p':
610             # 'ph> ' prompt - don't think we should hit this, but what the hay:
611             return ''                                                   # ===>
612         elif nextChar in '0123456789':
613             # Error notice - we're currently assuming single line errors:
614             return self.proc.readline()[:-1]                            # ===>
615         elif nextChar in ' \t':
616             # Get content, up to next dashed line:
617             got = []
618             while nextChar != '-' and nextChar != '':
619                 got.append(self.proc.readline()[:-1])
620                 nextChar = self.proc.peekPendingChar()
621             return got
622     def __repr__(self):
623         return "<Ph instance, %s at %s>\n" % (self.proc.status(),
624                                              hex(id(self))[2:])
625     def clear(self):
626         """Clear-out initial preface or residual subproc input and output."""
627         pause = .5; maxIter = 10        # 5 seconds to clear
628         iterations = 0
629         got = ''
630         self.proc.write('')
631         while iterations < maxIter:
632             got = got + self.proc.readPendingChars()
633             # Strip out all but the last incomplete line:
634             got = string.splitfields(got, '\n')[-1]
635             if got == 'ph> ': return    # Ok.                             ===>
636             time.sleep(pause)
637         raise SubprocessError('ph not responding within %s secs' %
638                                 pause * maxIter)
639
640 #############################################################################
641 #####                             Test                                  #####
642 #############################################################################
643
644 def test(p=0):
645     print("\tOpening subprocess:")
646     p = Subprocess('cat', 1)            # set to expire noisily...
647     print(p)
648     print("\tOpening bogus subprocess, should fail:")
649     try:
650         b = Subprocess('/', 1)
651         print("\tOops!  Null-named subprocess startup *succeeded*?!?")
652     except SubprocessError:
653         print("\t...yep, it failed.")
654     print('\tWrite, then read, two newline-teriminated lines, using readline:')
655     p.write('first full line written\n'); p.write('second.\n')
656     print(`p.readline()`)
657     print(`p.readline()`)
658     print('\tThree lines, last sans newline, read using combination:')
659     p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
660     print('\tFirst line via readline:')
661     print(`p.readline()`)
662     print('\tRest via readPendingChars:')
663     print(p.readPendingChars())
664     print("\tStopping then continuing subprocess (verbose):")
665     if not p.stop(1):                   # verbose stop
666         print('\t** Stop seems to have failed!')
667     else:
668         print('\tWriting line while subprocess is paused...')
669         p.write('written while subprocess paused\n')
670         print('\tNonblocking read of paused subprocess (should be empty):')
671         print(p.readPendingChars())
672         print('\tContinuing subprocess (verbose):')
673         if not p.cont(1):               # verbose continue
674             print('\t** Continue seems to have failed!  Probly lost subproc...')
675             return p
676         else:
677             print('\tReading accumulated line, blocking read:')
678             print(p.readline())
679             print("\tDeleting subproc, which was set to die noisily:")
680             del p
681             print("\tDone.")
682             return None