]> git.phdru.name Git - bookmarks_db.git/blob - subproc.py
Fix(subproc.py): Test bogus subprocess at the beginning
[bookmarks_db.git] / subproc.py
1 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
2
3 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
4 routines.
5
6 Subprocess class features:
7
8  - provides non-blocking stdin and stderr reads
9
10  - provides subprocess stop and continue, kill-on-deletion
11
12  - provides detection of subprocess startup failure
13
14  - Subprocess objects have nice, informative string rep (as every good object
15    ought)."""
16
17 __version__ = "Revision: 1.15 "
18
19 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
20 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
21
22 # Prior art: Initially based python code examples demonstrating usage of pipes
23 #            and subprocesses, primarily one by jose pereira.
24
25 # Implementation notes:
26 # - I'm not using the fcntl module to implement non-blocking file descriptors,
27 #   because i don't know what all in it is portable and what is not.  I'm not
28 #   about to provide for different platform contingencies - at that extent, the
29 #   effort would be better spent hacking 'expect' into python.
30 # - Todo? - Incorporate an error-output handler approach, where error output is
31 #           checked on regular IO, when a handler is defined, and passed to the
32 #           handler (eg for printing) immediately as it shows...
33 # - Detection of failed subprocess startup is a gross kludge, at present.
34
35 # - new additions (1.3, 1.4):
36 #  - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
37 #    reads based solely on os.read with select, while capitalizing big-time on
38 #    multi-char read chunking.
39 #  - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
40 #
41 # ken.manheimer@nist.gov
42
43 # This is a modified version by Oleg Broytman <phd@phdru.name>.
44 # The original version is still preserved at
45 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
46
47 import sys, os, string, time, types
48 import select
49 import signal
50
51
52 class SubprocessError(Exception):
53     pass
54
55 # You may need to increase execvp_grace_seconds, if you have a large or slow
56 # path to search:
57 execvp_grace_seconds = 0.5
58
59 class Subprocess:
60     """Run and communicate asynchronously with a subprocess.
61
62     Provides non-blocking reads in the form of .readPendingChars and
63     .readPendingLine.
64
65     .readline will block until it gets a complete line.
66
67     .peekPendingChar does a non-blocking, non-consuming read for pending
68     output, and can be used before .readline to check non-destructively for
69     pending output.  .waitForPendingChar(timeout, pollPause=.1) blocks until
70     a new character is pending, or timeout secs pass, with granularity of
71     pollPause seconds.
72
73     There are corresponding read and peekPendingErrXXX routines, to read from
74     the subprocess stderr stream."""
75
76     pid = 0
77     cmd = ''
78     expire_noisily = 1                  # Announce subproc destruction?
79     pipefiles = []
80     readbuf = 0                         # fork will assign to be a readbuf obj
81     errbuf = 0                          # fork will assign to be a readbuf obj
82
83     def __init__(self, cmd, control_stderr=0, expire_noisily=0,
84                  in_fd=0, out_fd=1, err_fd=2):
85         """Launch a subprocess, given command string COMMAND."""
86         self.cmd = cmd
87         self.pid = 0
88         self.expire_noisily = expire_noisily
89         self.control_stderr = control_stderr
90         self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
91         self.fork()
92
93     def fork(self, cmd=None):
94         """Fork a subprocess with designated COMMAND (default, self.cmd)."""
95         if cmd: self.cmd = cmd
96         else: cmd = self.cmd
97         cmd = string.split(self.cmd)
98         pRc, cWp = os.pipe()            # parent-read-child, child-write-parent
99         cRp, pWc = os.pipe()            # child-read-parent, parent-write-child
100         pRe, cWe = os.pipe()            # parent-read-error, child-write-error
101         self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
102
103         self.pid = os.fork()
104
105         if self.pid == 0:       #### CHILD ####
106             parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
107             # Reopen stdin, out, err, on pipe ends:
108             os.dup2(cRp, self.in_fd)            # cRp = sys.stdin
109             os.dup2(cWp, self.out_fd)           # cWp = sys.stdout
110             if self.control_stderr:
111                 os.dup2(cWe, self.err_fd)       # cWe = sys.stderr
112             # Ensure (within reason) stray file descriptors are closed:
113             excludes = [self.in_fd, self.out_fd, self.err_fd]
114             for i in range(4,100):
115                 if i not in excludes:
116                     try: os.close(i)
117                     except os.error: pass
118
119             try:
120                 os.execvp(cmd[0], cmd)
121                 os._exit(1)                     # Shouldn't get here
122
123             except os.error as e:
124                 if self.control_stderr:
125                     os.dup2(parentErr, 2)       # Reconnect to parent's stdout
126                 sys.stderr.write("**execvp failed, '%s'**\n" %
127                                  str(e))
128                 os._exit(1)
129             os._exit(1)                 # Shouldn't get here.
130
131         else:           ### PARENT ###
132             # Connect to the child's file descriptors, using our customized
133             # fdopen:
134             self.toChild = os.fdopen(pWc, 'w')
135             self.toChild_fdlist = [pWc]
136             self.readbuf = ReadBuf(pRc)
137             self.errbuf = ReadBuf(pRe)
138             time.sleep(execvp_grace_seconds)
139             try:
140                 pid, err = os.waitpid(self.pid, os.WNOHANG)
141             except os.error as error:
142                 errno, msg = error
143                 if errno == 10:
144                     raise SubprocessError("Subprocess '%s' failed." % self.cmd)
145                 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
146             if pid == self.pid:
147                 # child exited already
148                 self.pid == None
149                 sig = err & 0xff
150                 rc = (err & 0xff00) >> 8
151                 if sig:
152                     raise SubprocessError(
153                     "child killed by signal %d with a return code of %d"
154                     % (sig, rc))
155                 if rc:
156                     raise SubprocessError(
157                           "child exited with return code %d" % rc)
158                 # Child may have exited, but not in error, so we won't say
159                 # anything more at this point.
160
161     ### Write input to subprocess ###
162
163     def write(self, str):
164         """Write a STRING to the subprocess."""
165
166         if not self.pid:
167             raise SubprocessError("no child")                           # ===>
168         if select.select([],self.toChild_fdlist,[],0)[1]:
169             self.toChild.write(str)
170             self.toChild.flush()
171         else:
172             # XXX Can write-buffer full be handled better??
173             raise IOError("write to %s blocked" % self)                 # ===>
174
175     def writeline(self, line=''):
176         """Write STRING, with added newline termination, to subprocess."""
177         self.write(line + '\n')
178
179     ### Get output from subprocess ###
180
181     def peekPendingChar(self):
182         """Return, but (effectively) do not consume a single pending output
183         char, or return null string if none pending."""
184
185         return self.readbuf.peekPendingChar()                           # ===>
186     def peekPendingErrChar(self):
187         """Return, but (effectively) do not consume a single pending output
188         char, or return null string if none pending."""
189
190         return self.errbuf.peekPendingChar()                            # ===>
191
192     def waitForPendingChar(self, timeout, pollPause=.1):
193         """Block max TIMEOUT secs until we peek a pending char, returning the
194         char, or '' if none encountered.
195
196         Pause POLLPAUSE secs (default .1) between polls."""
197
198         accume = 0
199         while 1:
200             nextChar = self.readbuf.peekPendingChar()
201             if nextChar or (accume > timeout): return nextChar
202             time.sleep(pollPause)
203             accume = accume + pollPause
204
205     def read(self, n=None):
206         """Read N chars, or all pending if no N specified."""
207         if n == None:
208             return self.readPendingChars()
209         got = ''
210         while n:
211             got0 = self.readPendingChars(n)
212             got = got + got0
213             n = n - len(got0)
214         return got
215     def readPendingChars(self, max=None):
216         """Read all currently pending subprocess output as a single string."""
217         return self.readbuf.readPendingChars(max)
218     def readPendingErrChars(self):
219         """Read all currently pending subprocess error output as a single
220         string."""
221         if self.control_stderr:
222             return self.errbuf.readPendingChars()
223         else:
224             raise SubprocessError("Haven't grabbed subprocess error stream.")
225
226     def readPendingLine(self):
227         """Read currently pending subprocess output, up to a complete line
228         (newline inclusive)."""
229         return self.readbuf.readPendingLine()
230     def readPendingErrLine(self):
231         """Read currently pending subprocess error output, up to a complete
232         line (newline inclusive)."""
233         if self.control_stderr:
234             return self.errbuf.readPendingLine()
235         else:
236             raise SubprocessError("Haven't grabbed subprocess error stream.")
237
238     def readline(self):
239         """Return next complete line of subprocess output, blocking until
240         then."""
241         return self.readbuf.readline()
242     def readlineErr(self):
243         """Return next complete line of subprocess error output, blocking until
244         then."""
245         if self.control_stderr:
246             return self.errbuf.readline()
247         else:
248             raise SubprocessError("Haven't grabbed subprocess error stream.")
249
250     ### Subprocess Control ###
251
252     def active(self):
253         """True if subprocess is alive and kicking."""
254         return self.status(boolean=1)
255     def status(self, boolean=0):
256         """Return string indicating whether process is alive or dead."""
257         active = 0
258         if not self.cmd:
259             status = 'sans command'
260         elif not self.pid:
261             status = 'sans process'
262         elif not self.cont():
263             status = "(unresponding) '%s'" % self.cmd
264         else:
265             status = "'%s'" % self.cmd
266             active = 1
267         if boolean:
268             return active
269         else:
270             return status
271
272     def stop(self, verbose=1):
273         """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
274         otherwise."""
275         try:
276             os.kill(self.pid, signal.SIGSTOP)
277         except os.error:
278             if verbose:
279                 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
280             return 0
281         if verbose: print("Stopped '%s'" % self.cmd)
282         return 'stopped'
283
284     def cont(self, verbose=0):
285         """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
286         otherwise."""
287         try:
288             os.kill(self.pid, signal.SIGCONT)
289         except os.error:
290             if verbose:
291                 print(("Continue failed for '%s' - '%s'" %
292                        (self.cmd, sys.exc_value)))
293             return 0
294         if verbose: print("Continued '%s'" % self.cmd)
295         return 'continued'
296
297     def die(self):
298         """Send process PID signal SIG (default 9, 'kill'), returning None once
299          it is successfully reaped.
300
301         SubprocessError is raised if process is not successfully killed."""
302
303         if not self.pid:
304             raise SubprocessError("No process")                         # ===>
305         elif not self.cont():
306             raise SubprocessError("Can't signal subproc %s" % self)     # ===>
307
308         # Try sending first a TERM and then a KILL signal.
309         keep_trying = 1
310         sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
311         for sig in sigs:
312             try:
313                 os.kill(self.pid, sig[1])
314             except posix.error:
315                 # keep trying
316                 pass
317             # Try a couple or three times to reap the process with waitpid:
318             for i in range(5):
319                 # WNOHANG == 1 on sunos, presumably same elsewhere.
320                 if os.waitpid(self.pid, os.WNOHANG):
321                     if self.expire_noisily:
322                         print(("\n(%s subproc %d '%s' / %s)" %
323                                (sig[0], self.pid, self.cmd,
324                                 hex(id(self))[2:])))
325                     for i in self.pipefiles:
326                         os.close(i)
327                     self.pid = 0
328                     return None                                         # ===>
329                 time.sleep(.1)
330         # Only got here if subprocess is not gone:
331         raise SubprocessError(
332                "Failed kill of subproc %d, '%s', with signals %s" %
333                 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
334
335     def __del__(self):
336         """Terminate the subprocess"""
337         if self.pid:
338             self.die()
339
340     def __repr__(self):
341         status = self.status()
342         return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
343
344 #############################################################################
345 #####                 Non-blocking read operations                      #####
346 #############################################################################
347
348 class ReadBuf:
349     """Output buffer for non-blocking reads on selectable files like pipes and
350     sockets.  Init with a file descriptor for the file."""
351
352     def __init__(self, fd, maxChunkSize=1024):
353         """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
354         (default 1024)."""
355
356         if fd < 0:
357             raise ValueError
358         self.fd = fd
359         self.eof = 0                    # May be set with stuff still in .buf
360         self.buf = ''
361         self.chunkSize = maxChunkSize   # Biggest read chunk, default 1024.
362
363     def fileno(self):
364         return self.fd
365
366     def peekPendingChar(self):
367         """Return, but don't consume, first character of unconsumed output from
368         file, or empty string if none."""
369
370         if self.buf:
371             return self.buf[0]                                          # ===>
372
373         if self.eof:
374             return ''                                                   # ===>
375
376         sel = select.select([self.fd], [], [self.fd], 0)
377         if sel[2]:
378             self.eof = 1
379         if sel[0]:
380             self.buf = os.read(self.fd, self.chunkSize)                 # ===>
381             return self.buf[0]          # Assume select don't lie.
382         else: return ''                                                 # ===>
383
384
385     def readPendingChar(self):
386         """Consume first character of unconsumed output from file, or empty
387         string if none."""
388
389         if self.buf:
390             got, self.buf = self.buf[0], self.buf[1:]
391             return got                                                  # ===>
392
393         if self.eof:
394             return ''                                                   # ===>
395
396         sel = select.select([self.fd], [], [self.fd], 0)
397         if sel[2]:
398             self.eof = 1
399         if sel[0]:
400             return os.read(self.fd, 1)                                  # ===>
401         else: return ''                                                 # ===>
402
403     def readPendingChars(self, max=None):
404         """Consume uncomsumed output from FILE, or empty string if nothing
405         pending."""
406
407         got = ""
408         if self.buf:
409             if (max > 0) and (len(self.buf) > max):
410                 got = self.buf[0:max]
411                 self.buf = self.buf[max:]
412             else:
413                 got, self.buf = self.buf, ''
414             return got
415
416         if self.eof:
417             return ''
418
419         sel = select.select([self.fd], [], [self.fd], 0)
420         if sel[2]:
421             self.eof = 1
422         if sel[0]:
423             got = got + os.read(self.fd, self.chunkSize)
424             if max == 0:
425                 self.buf = got
426                 return ''
427             elif max == None:
428                 return got
429             elif len(got) > max:
430                 self.buf = self.buf + got[max:]
431                 return got[:max]
432             else:
433                 return got
434         else: return ''
435
436     def readPendingLine(self, block=0):
437         """Return pending output from FILE, up to first newline (inclusive).
438
439         Does not block unless optional arg BLOCK is true.
440
441         Note that an error will be raised if a new eof is encountered without
442         any newline."""
443
444         if self.buf:
445             to = string.find(self.buf, '\n')
446             if to != -1:
447                 got, self.buf = self.buf[:to+1], self.buf[to+1:]
448                 return got                                              # ===>
449             got, self.buf = self.buf, ''
450         else:
451             if self.eof:
452                 return ''                                               # ===>
453             got = ''
454
455         # Herein, 'got' contains the (former) contents of the buffer, and it
456         # doesn't include a newline.
457         fdlist = [self.fd]
458         period = block and 1.0 or 0     # don't be too busy while waiting
459         while 1:                        # (we'll only loop if block set)
460             sel = select.select(fdlist, [], fdlist, period)
461             if sel[2]:
462                 self.eof = 1
463             if sel[0]:
464                 got = got + os.read(self.fd, self.chunkSize)
465
466             to = string.find(got, '\n')
467             if to != -1:
468                 got, self.buf = got[:to+1], got[to+1:]
469                 return got                                              # ===>
470             if not block:
471                 return got                                              # ===>
472             if self.eof:
473                 self.buf = ''           # this is how an ordinary file acts...
474                 return got
475             # otherwise - no newline, blocking requested, no eof - loop. # ==^
476
477     def readline(self):
478         """Return next output line from file, blocking until it is received."""
479
480         return self.readPendingLine(1)                                  # ===>
481
482
483 #############################################################################
484 #####                 Encapsulated reading and writing                  #####
485 #############################################################################
486 # Encapsulate messages so the end can be unambiguously identified, even
487 # when they contain multiple, possibly empty lines.
488
489 class RecordFile:
490     """Encapsulate stream object for record-oriented IO.
491
492     Particularly useful when dealing with non-line oriented communications
493     over pipes, eg with subprocesses."""
494
495     # Message is written preceded by a line containing the message length.
496
497     def __init__(self, f):
498         self.file = f
499
500     def write_record(self, s):
501         "Write so self.read knows exactly how much to read."
502         f = self.__dict__['file']
503         f.write("%s\n%s" % (len(s), s))
504         if hasattr(f, 'flush'):
505             f.flush()
506
507     def read_record(self):
508         "Read and reconstruct message as prepared by self.write."
509         f = self.__dict__['file']
510         line = f.readline()[:-1]
511         if line:
512             try:
513                 l = string.atoi(line)
514             except ValueError:
515                 raise IOError(("corrupt %s file structure"
516                                 % self.__class__.__name__))
517             return f.read(l)
518         else:
519             # EOF.
520             return ''
521
522     def __getattr__(self, attr):
523         """Implement characteristic file object attributes."""
524         f = self.__dict__['file']
525         if hasattr(f, attr):
526             return getattr(f, attr)
527         else:
528             raise AttributeError(attr)
529
530     def __repr__(self):
531         return "<%s of %s at %s>" % (self.__class__.__name__,
532                                      self.__dict__['file'],
533                                      hex(id(self))[2:])
534
535 def record_trial(s):
536     """Exercise encapsulated write/read with an arbitrary string.
537
538     Raise IOError if the string gets distorted through transmission!"""
539     from StringIO import StringIO
540     sf = StringIO()
541     c = RecordFile(sf)
542     c.write(s)
543     c.seek(0)
544     r = c.read()
545     show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
546     if r != s:
547         raise IOError("String distorted:\n%s" % show)
548
549 #############################################################################
550 #####                   An example subprocess interfaces                #####
551 #############################################################################
552
553 class Ph:
554     """Convenient interface to CCSO 'ph' nameserver subprocess.
555
556     .query('string...') method takes a query and returns a list of dicts, each
557     of which represents one entry."""
558
559     # Note that i made this a class that handles a subprocess object, rather
560     # than one that inherits from it.  I didn't see any functional
561     # disadvantages, and didn't think that full support of the entire
562     # Subprocess functionality was in any way suitable for interaction with
563     # this specialized interface.  ?  klm 13-Jan-1995
564
565     def __init__(self):
566         try:
567             self.proc = Subprocess('ph', 1)
568         except:
569             raise SubprocessError('failure starting ph: %s' %         # ===>
570                                     str(sys.exc_value))
571
572     def query(self, q):
573         """Send a query and return a list of dicts for responses.
574
575         Raise a ValueError if ph responds with an error."""
576
577         self.clear()
578
579         self.proc.writeline('query ' + q)
580         got = []; it = {}
581         while 1:
582             response = self.getreply()  # Should get null on new prompt.
583             errs = self.proc.readPendingErrChars()
584             if errs:
585                 sys.stderr.write(errs)
586             if it:
587                 got.append(it)
588                 it = {}
589             if not response:
590                 return got                                              # ===>
591             elif type(response) == types.StringType:
592                 raise ValueError("ph failed match: '%s'" % response)    # ===>
593             for line in response:
594                 # convert to a dict:
595                 line = string.splitfields(line, ':')
596                 it[string.strip(line[0])] = (
597                     string.strip(string.join(line[1:])))
598
599     def getreply(self):
600         """Consume next response from ph, returning list of lines or string
601         err."""
602         # Key on first char:  (First line may lack newline.)
603         #  - dash       discard line
604         #  - 'ph> '     conclusion of response
605         #  - number     error message
606         #  - whitespace beginning of next response
607
608         nextChar = self.proc.waitForPendingChar(60)
609         if not nextChar:
610             raise SubprocessError('ph subprocess not responding')       # ===>
611         elif nextChar == '-':
612             # dashed line - discard it, and continue reading:
613             self.proc.readline()
614             return self.getreply()                                      # ===>
615         elif nextChar == 'p':
616             # 'ph> ' prompt - don't think we should hit this, but what the hay:
617             return ''                                                   # ===>
618         elif nextChar in '0123456789':
619             # Error notice - we're currently assuming single line errors:
620             return self.proc.readline()[:-1]                            # ===>
621         elif nextChar in ' \t':
622             # Get content, up to next dashed line:
623             got = []
624             while nextChar != '-' and nextChar != '':
625                 got.append(self.proc.readline()[:-1])
626                 nextChar = self.proc.peekPendingChar()
627             return got
628     def __repr__(self):
629         return "<Ph instance, %s at %s>\n" % (self.proc.status(),
630                                              hex(id(self))[2:])
631     def clear(self):
632         """Clear-out initial preface or residual subproc input and output."""
633         pause = .5; maxIter = 10        # 5 seconds to clear
634         iterations = 0
635         got = ''
636         self.proc.write('')
637         while iterations < maxIter:
638             got = got + self.proc.readPendingChars()
639             # Strip out all but the last incomplete line:
640             got = string.splitfields(got, '\n')[-1]
641             if got == 'ph> ': return    # Ok.                             ===>
642             time.sleep(pause)
643         raise SubprocessError('ph not responding within %s secs' %
644                                 pause * maxIter)
645
646 #############################################################################
647 #####                             Test                                  #####
648 #############################################################################
649
650 def test(p=0):
651     print("\tOpening bogus subprocess, should fail:")
652     try:
653         b = Subprocess('/', 1)
654         print("\tOops!  Null-named subprocess startup *succeeded*?!?")
655     except SubprocessError:
656         print("\t...yep, it failed.")
657     print("\tOpening cat subprocess:")
658     p = Subprocess('cat', 1)            # set to expire noisily...
659     print(p)
660     print('\tWrite, then read, two newline-teriminated lines, using readline:')
661     p.write('first full line written\n'); p.write('second.\n')
662     print(repr(p.readline()))
663     print(repr(p.readline()))
664     print('\tThree lines, last sans newline, read using combination:')
665     p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
666     print('\tFirst line via readline:')
667     print(repr(p.readline()))
668     print('\tRest via readPendingChars:')
669     print(p.readPendingChars())
670     print("\tStopping then continuing subprocess (verbose):")
671     if not p.stop(1):                   # verbose stop
672         print('\t** Stop seems to have failed!')
673     else:
674         print('\tWriting line while subprocess is paused...')
675         p.write('written while subprocess paused\n')
676         print('\tNonblocking read of paused subprocess (should be empty):')
677         print(p.readPendingChars())
678         print('\tContinuing subprocess (verbose):')
679         if not p.cont(1):               # verbose continue
680             print('\t** Continue seems to have failed!  Probly lost subproc...')
681             return p
682         else:
683             print('\tReading accumulated line, blocking read:')
684             print(p.readline())
685             print("\tDeleting subproc, which was set to die noisily:")
686             del p
687             print("\tDone.")
688             return None