]> git.phdru.name Git - bookmarks_db.git/blob - subproc.py
Fix(subproc.py): Clear pid to avoid repeated killing
[bookmarks_db.git] / subproc.py
1 #! /usr/bin/env python
2
3 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
4
5 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
6 routines.
7
8 Subprocess class features:
9
10  - provides non-blocking stdin and stderr reads
11
12  - provides subprocess stop and continue, kill-on-deletion
13
14  - provides detection of subprocess startup failure
15
16  - Subprocess objects have nice, informative string rep (as every good object
17    ought)."""
18
19 __version__ = "Revision: 1.15 "
20
21 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
22 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
23
24 # Prior art: Initially based python code examples demonstrating usage of pipes
25 #            and subprocesses, primarily one by jose pereira.
26
27 # Implementation notes:
28 # - I'm not using the fcntl module to implement non-blocking file descriptors,
29 #   because i don't know what all in it is portable and what is not.  I'm not
30 #   about to provide for different platform contingencies - at that extent, the
31 #   effort would be better spent hacking 'expect' into python.
32 # - Todo? - Incorporate an error-output handler approach, where error output is
33 #           checked on regular IO, when a handler is defined, and passed to the
34 #           handler (eg for printing) immediately as it shows...
35 # - Detection of failed subprocess startup is a gross kludge, at present.
36
37 # - new additions (1.3, 1.4):
38 #  - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
39 #    reads based solely on os.read with select, while capitalizing big-time on
40 #    multi-char read chunking.
41 #  - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
42 #
43 # ken.manheimer@nist.gov
44
45 # This is a modified version by Oleg Broytman <phd@phdru.name>.
46 # The original version is still preserved at
47 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
48
49 import sys, os, string, time, types
50 import select
51 import signal
52
53
54 class SubprocessError(Exception):
55     pass
56
57 # You may need to increase execvp_grace_seconds, if you have a large or slow
58 # path to search:
59 execvp_grace_seconds = 0.5
60
61 class Subprocess:
62     """Run and communicate asynchronously with a subprocess.
63
64     Provides non-blocking reads in the form of .readPendingChars and
65     .readPendingLine.
66
67     .readline will block until it gets a complete line.
68
69     .peekPendingChar does a non-blocking, non-consuming read for pending
70     output, and can be used before .readline to check non-destructively for
71     pending output.  .waitForPendingChar(timeout, pollPause=.1) blocks until
72     a new character is pending, or timeout secs pass, with granularity of
73     pollPause seconds.
74
75     There are corresponding read and peekPendingErrXXX routines, to read from
76     the subprocess stderr stream."""
77
78     pid = 0
79     cmd = ''
80     expire_noisily = 1                  # Announce subproc destruction?
81     pipefiles = []
82     readbuf = 0                         # fork will assign to be a readbuf obj
83     errbuf = 0                          # fork will assign to be a readbuf obj
84
85     def __init__(self, cmd, control_stderr=0, expire_noisily=0,
86                  in_fd=0, out_fd=1, err_fd=2):
87         """Launch a subprocess, given command string COMMAND."""
88         self.cmd = cmd
89         self.pid = 0
90         self.expire_noisily = expire_noisily
91         self.control_stderr = control_stderr
92         self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
93         self.fork()
94
95     def fork(self, cmd=None):
96         """Fork a subprocess with designated COMMAND (default, self.cmd)."""
97         if cmd: self.cmd = cmd
98         else: cmd = self.cmd
99         cmd = string.split(self.cmd)
100         pRc, cWp = os.pipe()            # parent-read-child, child-write-parent
101         cRp, pWc = os.pipe()            # child-read-parent, parent-write-child
102         pRe, cWe = os.pipe()            # parent-read-error, child-write-error
103         self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
104
105         self.pid = os.fork()
106
107         if self.pid == 0:       #### CHILD ####
108             parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
109             # Reopen stdin, out, err, on pipe ends:
110             os.dup2(cRp, self.in_fd)            # cRp = sys.stdin
111             os.dup2(cWp, self.out_fd)           # cWp = sys.stdout
112             if self.control_stderr:
113                 os.dup2(cWe, self.err_fd)       # cWe = sys.stderr
114             # Ensure (within reason) stray file descriptors are closed:
115             excludes = [self.in_fd, self.out_fd, self.err_fd]
116             for i in range(4,100):
117                 if i not in excludes:
118                     try: os.close(i)
119                     except os.error: pass
120
121             try:
122                 os.execvp(cmd[0], cmd)
123                 os._exit(1)                     # Shouldn't get here
124
125             except os.error as e:
126                 if self.control_stderr:
127                     os.dup2(parentErr, 2)       # Reconnect to parent's stdout
128                 sys.stderr.write("**execvp failed, '%s'**\n" %
129                                  str(e))
130                 os._exit(1)
131             os._exit(1)                 # Shouldn't get here.
132
133         else:           ### PARENT ###
134             # Connect to the child's file descriptors, using our customized
135             # fdopen:
136             self.toChild = os.fdopen(pWc, 'w')
137             self.toChild_fdlist = [pWc]
138             self.readbuf = ReadBuf(pRc)
139             self.errbuf = ReadBuf(pRe)
140             time.sleep(execvp_grace_seconds)
141             try:
142                 pid, err = os.waitpid(self.pid, os.WNOHANG)
143             except os.error as error:
144                 errno, msg = error
145                 if errno == 10:
146                     self.pid = None
147                     raise SubprocessError("Subprocess '%s' failed." % self.cmd)
148                 self.pid = None
149                 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
150             if pid == self.pid:
151                 # child exited already
152                 self.pid == None
153                 sig = err & 0xff
154                 rc = (err & 0xff00) >> 8
155                 if sig:
156                     raise SubprocessError(
157                     "child killed by signal %d with a return code of %d"
158                     % (sig, rc))
159                 if rc:
160                     self.pid = None
161                     raise SubprocessError(
162                           "child exited with return code %d" % rc)
163                 # Child may have exited, but not in error, so we won't say
164                 # anything more at this point.
165
166     ### Write input to subprocess ###
167
168     def write(self, str):
169         """Write a STRING to the subprocess."""
170
171         if not self.pid:
172             raise SubprocessError("no child")                           # ===>
173         if select.select([],self.toChild_fdlist,[],0)[1]:
174             self.toChild.write(str)
175             self.toChild.flush()
176         else:
177             # XXX Can write-buffer full be handled better??
178             raise IOError("write to %s blocked" % self)                 # ===>
179
180     def writeline(self, line=''):
181         """Write STRING, with added newline termination, to subprocess."""
182         self.write(line + '\n')
183
184     ### Get output from subprocess ###
185
186     def peekPendingChar(self):
187         """Return, but (effectively) do not consume a single pending output
188         char, or return null string if none pending."""
189
190         return self.readbuf.peekPendingChar()                           # ===>
191     def peekPendingErrChar(self):
192         """Return, but (effectively) do not consume a single pending output
193         char, or return null string if none pending."""
194
195         return self.errbuf.peekPendingChar()                            # ===>
196
197     def waitForPendingChar(self, timeout, pollPause=.1):
198         """Block max TIMEOUT secs until we peek a pending char, returning the
199         char, or '' if none encountered.
200
201         Pause POLLPAUSE secs (default .1) between polls."""
202
203         accume = 0
204         while 1:
205             nextChar = self.readbuf.peekPendingChar()
206             if nextChar or (accume > timeout): return nextChar
207             time.sleep(pollPause)
208             accume = accume + pollPause
209
210     def read(self, n=None):
211         """Read N chars, or all pending if no N specified."""
212         if n == None:
213             return self.readPendingChars()
214         got = ''
215         while n:
216             got0 = self.readPendingChars(n)
217             got = got + got0
218             n = n - len(got0)
219         return got
220     def readPendingChars(self, max=None):
221         """Read all currently pending subprocess output as a single string."""
222         return self.readbuf.readPendingChars(max)
223     def readPendingErrChars(self):
224         """Read all currently pending subprocess error output as a single
225         string."""
226         if self.control_stderr:
227             return self.errbuf.readPendingChars()
228         else:
229             raise SubprocessError("Haven't grabbed subprocess error stream.")
230
231     def readPendingLine(self):
232         """Read currently pending subprocess output, up to a complete line
233         (newline inclusive)."""
234         return self.readbuf.readPendingLine()
235     def readPendingErrLine(self):
236         """Read currently pending subprocess error output, up to a complete
237         line (newline inclusive)."""
238         if self.control_stderr:
239             return self.errbuf.readPendingLine()
240         else:
241             raise SubprocessError("Haven't grabbed subprocess error stream.")
242
243     def readline(self):
244         """Return next complete line of subprocess output, blocking until
245         then."""
246         return self.readbuf.readline()
247     def readlineErr(self):
248         """Return next complete line of subprocess error output, blocking until
249         then."""
250         if self.control_stderr:
251             return self.errbuf.readline()
252         else:
253             raise SubprocessError("Haven't grabbed subprocess error stream.")
254
255     ### Subprocess Control ###
256
257     def active(self):
258         """True if subprocess is alive and kicking."""
259         return self.status(boolean=1)
260     def status(self, boolean=0):
261         """Return string indicating whether process is alive or dead."""
262         active = 0
263         if not self.cmd:
264             status = 'sans command'
265         elif not self.pid:
266             status = 'sans process'
267         elif not self.cont():
268             status = "(unresponding) '%s'" % self.cmd
269         else:
270             status = "'%s'" % self.cmd
271             active = 1
272         if boolean:
273             return active
274         else:
275             return status
276
277     def stop(self, verbose=1):
278         """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
279         otherwise."""
280         try:
281             os.kill(self.pid, signal.SIGSTOP)
282         except os.error:
283             if verbose:
284                 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
285             return 0
286         if verbose: print("Stopped '%s'" % self.cmd)
287         return 'stopped'
288
289     def cont(self, verbose=0):
290         """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
291         otherwise."""
292         try:
293             os.kill(self.pid, signal.SIGCONT)
294         except os.error:
295             if verbose:
296                 print(("Continue failed for '%s' - '%s'" %
297                        (self.cmd, sys.exc_value)))
298             return 0
299         if verbose: print("Continued '%s'" % self.cmd)
300         return 'continued'
301
302     def die(self):
303         """Send process PID signal SIG (default 9, 'kill'), returning None once
304          it is successfully reaped.
305
306         SubprocessError is raised if process is not successfully killed."""
307
308         if not self.pid:
309             raise SubprocessError("No process")                         # ===>
310         elif not self.cont():
311             raise SubprocessError("Can't signal subproc %s" % self)     # ===>
312
313         # Try sending first a TERM and then a KILL signal.
314         keep_trying = 1
315         sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
316         for sig in sigs:
317             try:
318                 os.kill(self.pid, sig[1])
319             except posix.error:
320                 # keep trying
321                 pass
322             # Try a couple or three times to reap the process with waitpid:
323             for i in range(5):
324                 # WNOHANG == 1 on sunos, presumably same elsewhere.
325                 if os.waitpid(self.pid, os.WNOHANG):
326                     if self.expire_noisily:
327                         print(("\n(%s subproc %d '%s' / %s)" %
328                                (sig[0], self.pid, self.cmd,
329                                 hex(id(self))[2:])))
330                     for i in self.pipefiles:
331                         os.close(i)
332                     self.pid = 0
333                     return None                                         # ===>
334                 time.sleep(.1)
335         # Only got here if subprocess is not gone:
336         raise SubprocessError(
337                "Failed kill of subproc %d, '%s', with signals %s" %
338                 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
339
340     def __del__(self):
341         """Terminate the subprocess"""
342         if self.pid:
343             self.die()
344
345     def __repr__(self):
346         status = self.status()
347         return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
348
349 #############################################################################
350 #####                 Non-blocking read operations                      #####
351 #############################################################################
352
353 class ReadBuf:
354     """Output buffer for non-blocking reads on selectable files like pipes and
355     sockets.  Init with a file descriptor for the file."""
356
357     def __init__(self, fd, maxChunkSize=1024):
358         """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
359         (default 1024)."""
360
361         if fd < 0:
362             raise ValueError
363         self.fd = fd
364         self.eof = 0                    # May be set with stuff still in .buf
365         self.buf = ''
366         self.chunkSize = maxChunkSize   # Biggest read chunk, default 1024.
367
368     def fileno(self):
369         return self.fd
370
371     def peekPendingChar(self):
372         """Return, but don't consume, first character of unconsumed output from
373         file, or empty string if none."""
374
375         if self.buf:
376             return self.buf[0]                                          # ===>
377
378         if self.eof:
379             return ''                                                   # ===>
380
381         sel = select.select([self.fd], [], [self.fd], 0)
382         if sel[2]:
383             self.eof = 1
384         if sel[0]:
385             self.buf = os.read(self.fd, self.chunkSize)                 # ===>
386             return self.buf[0]          # Assume select don't lie.
387         else: return ''                                                 # ===>
388
389
390     def readPendingChar(self):
391         """Consume first character of unconsumed output from file, or empty
392         string if none."""
393
394         if self.buf:
395             got, self.buf = self.buf[0], self.buf[1:]
396             return got                                                  # ===>
397
398         if self.eof:
399             return ''                                                   # ===>
400
401         sel = select.select([self.fd], [], [self.fd], 0)
402         if sel[2]:
403             self.eof = 1
404         if sel[0]:
405             return os.read(self.fd, 1)                                  # ===>
406         else: return ''                                                 # ===>
407
408     def readPendingChars(self, max=None):
409         """Consume uncomsumed output from FILE, or empty string if nothing
410         pending."""
411
412         got = ""
413         if self.buf:
414             if (max > 0) and (len(self.buf) > max):
415                 got = self.buf[0:max]
416                 self.buf = self.buf[max:]
417             else:
418                 got, self.buf = self.buf, ''
419             return got
420
421         if self.eof:
422             return ''
423
424         sel = select.select([self.fd], [], [self.fd], 0)
425         if sel[2]:
426             self.eof = 1
427         if sel[0]:
428             got = got + os.read(self.fd, self.chunkSize)
429             if max == 0:
430                 self.buf = got
431                 return ''
432             elif max == None:
433                 return got
434             elif len(got) > max:
435                 self.buf = self.buf + got[max:]
436                 return got[:max]
437             else:
438                 return got
439         else: return ''
440
441     def readPendingLine(self, block=0):
442         """Return pending output from FILE, up to first newline (inclusive).
443
444         Does not block unless optional arg BLOCK is true.
445
446         Note that an error will be raised if a new eof is encountered without
447         any newline."""
448
449         if self.buf:
450             to = string.find(self.buf, '\n')
451             if to != -1:
452                 got, self.buf = self.buf[:to+1], self.buf[to+1:]
453                 return got                                              # ===>
454             got, self.buf = self.buf, ''
455         else:
456             if self.eof:
457                 return ''                                               # ===>
458             got = ''
459
460         # Herein, 'got' contains the (former) contents of the buffer, and it
461         # doesn't include a newline.
462         fdlist = [self.fd]
463         period = block and 1.0 or 0     # don't be too busy while waiting
464         while 1:                        # (we'll only loop if block set)
465             sel = select.select(fdlist, [], fdlist, period)
466             if sel[2]:
467                 self.eof = 1
468             if sel[0]:
469                 got = got + os.read(self.fd, self.chunkSize)
470
471             to = string.find(got, '\n')
472             if to != -1:
473                 got, self.buf = got[:to+1], got[to+1:]
474                 return got                                              # ===>
475             if not block:
476                 return got                                              # ===>
477             if self.eof:
478                 self.buf = ''           # this is how an ordinary file acts...
479                 return got
480             # otherwise - no newline, blocking requested, no eof - loop. # ==^
481
482     def readline(self):
483         """Return next output line from file, blocking until it is received."""
484
485         return self.readPendingLine(1)                                  # ===>
486
487
488 #############################################################################
489 #####                 Encapsulated reading and writing                  #####
490 #############################################################################
491 # Encapsulate messages so the end can be unambiguously identified, even
492 # when they contain multiple, possibly empty lines.
493
494 class RecordFile:
495     """Encapsulate stream object for record-oriented IO.
496
497     Particularly useful when dealing with non-line oriented communications
498     over pipes, eg with subprocesses."""
499
500     # Message is written preceded by a line containing the message length.
501
502     def __init__(self, f):
503         self.file = f
504
505     def write_record(self, s):
506         "Write so self.read knows exactly how much to read."
507         f = self.__dict__['file']
508         f.write("%s\n%s" % (len(s), s))
509         if hasattr(f, 'flush'):
510             f.flush()
511
512     def read_record(self):
513         "Read and reconstruct message as prepared by self.write."
514         f = self.__dict__['file']
515         line = f.readline()[:-1]
516         if line:
517             try:
518                 l = string.atoi(line)
519             except ValueError:
520                 raise IOError(("corrupt %s file structure"
521                                 % self.__class__.__name__))
522             return f.read(l)
523         else:
524             # EOF.
525             return ''
526
527     def __getattr__(self, attr):
528         """Implement characteristic file object attributes."""
529         f = self.__dict__['file']
530         if hasattr(f, attr):
531             return getattr(f, attr)
532         else:
533             raise AttributeError(attr)
534
535     def __repr__(self):
536         return "<%s of %s at %s>" % (self.__class__.__name__,
537                                      self.__dict__['file'],
538                                      hex(id(self))[2:])
539
540 def record_trial(s):
541     """Exercise encapsulated write/read with an arbitrary string.
542
543     Raise IOError if the string gets distorted through transmission!"""
544     from StringIO import StringIO
545     sf = StringIO()
546     c = RecordFile(sf)
547     c.write(s)
548     c.seek(0)
549     r = c.read()
550     show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
551     if r != s:
552         raise IOError("String distorted:\n%s" % show)
553
554 #############################################################################
555 #####                   An example subprocess interfaces                #####
556 #############################################################################
557
558 class Ph:
559     """Convenient interface to CCSO 'ph' nameserver subprocess.
560
561     .query('string...') method takes a query and returns a list of dicts, each
562     of which represents one entry."""
563
564     # Note that i made this a class that handles a subprocess object, rather
565     # than one that inherits from it.  I didn't see any functional
566     # disadvantages, and didn't think that full support of the entire
567     # Subprocess functionality was in any way suitable for interaction with
568     # this specialized interface.  ?  klm 13-Jan-1995
569
570     def __init__(self):
571         try:
572             self.proc = Subprocess('ph', 1)
573         except:
574             raise SubprocessError('failure starting ph: %s' %         # ===>
575                                     str(sys.exc_value))
576
577     def query(self, q):
578         """Send a query and return a list of dicts for responses.
579
580         Raise a ValueError if ph responds with an error."""
581
582         self.clear()
583
584         self.proc.writeline('query ' + q)
585         got = []; it = {}
586         while 1:
587             response = self.getreply()  # Should get null on new prompt.
588             errs = self.proc.readPendingErrChars()
589             if errs:
590                 sys.stderr.write(errs)
591             if it:
592                 got.append(it)
593                 it = {}
594             if not response:
595                 return got                                              # ===>
596             elif type(response) == types.StringType:
597                 raise ValueError("ph failed match: '%s'" % response)    # ===>
598             for line in response:
599                 # convert to a dict:
600                 line = string.splitfields(line, ':')
601                 it[string.strip(line[0])] = (
602                     string.strip(string.join(line[1:])))
603
604     def getreply(self):
605         """Consume next response from ph, returning list of lines or string
606         err."""
607         # Key on first char:  (First line may lack newline.)
608         #  - dash       discard line
609         #  - 'ph> '     conclusion of response
610         #  - number     error message
611         #  - whitespace beginning of next response
612
613         nextChar = self.proc.waitForPendingChar(60)
614         if not nextChar:
615             raise SubprocessError('ph subprocess not responding')       # ===>
616         elif nextChar == '-':
617             # dashed line - discard it, and continue reading:
618             self.proc.readline()
619             return self.getreply()                                      # ===>
620         elif nextChar == 'p':
621             # 'ph> ' prompt - don't think we should hit this, but what the hay:
622             return ''                                                   # ===>
623         elif nextChar in '0123456789':
624             # Error notice - we're currently assuming single line errors:
625             return self.proc.readline()[:-1]                            # ===>
626         elif nextChar in ' \t':
627             # Get content, up to next dashed line:
628             got = []
629             while nextChar != '-' and nextChar != '':
630                 got.append(self.proc.readline()[:-1])
631                 nextChar = self.proc.peekPendingChar()
632             return got
633     def __repr__(self):
634         return "<Ph instance, %s at %s>\n" % (self.proc.status(),
635                                              hex(id(self))[2:])
636     def clear(self):
637         """Clear-out initial preface or residual subproc input and output."""
638         pause = .5; maxIter = 10        # 5 seconds to clear
639         iterations = 0
640         got = ''
641         self.proc.write('')
642         while iterations < maxIter:
643             got = got + self.proc.readPendingChars()
644             # Strip out all but the last incomplete line:
645             got = string.splitfields(got, '\n')[-1]
646             if got == 'ph> ': return    # Ok.                             ===>
647             time.sleep(pause)
648         raise SubprocessError('ph not responding within %s secs' %
649                                 pause * maxIter)
650
651 #############################################################################
652 #####                             Test                                  #####
653 #############################################################################
654
655 def test(p=0):
656     print("\tOpening bogus subprocess, should fail:")
657     try:
658         b = Subprocess('/', 1)
659         print("\tOops!  Null-named subprocess startup *succeeded*?!?")
660     except SubprocessError:
661         print("\t...yep, it failed.")
662     print("\tOpening cat subprocess:")
663     p = Subprocess('cat', 1)            # set to expire noisily...
664     print(p)
665     print('\tWrite, then read, two newline-teriminated lines, using readline:')
666     p.write('first full line written\n'); p.write('second.\n')
667     print(repr(p.readline()))
668     print(repr(p.readline()))
669     print('\tThree lines, last sans newline, read using combination:')
670     p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
671     print('\tFirst line via readline:')
672     print(repr(p.readline()))
673     print('\tRest via readPendingChars:')
674     print(p.readPendingChars())
675     print("\tStopping then continuing subprocess (verbose):")
676     if not p.stop(1):                   # verbose stop
677         print('\t** Stop seems to have failed!')
678     else:
679         print('\tWriting line while subprocess is paused...')
680         p.write('written while subprocess paused\n')
681         print('\tNonblocking read of paused subprocess (should be empty):')
682         print(p.readPendingChars())
683         print('\tContinuing subprocess (verbose):')
684         if not p.cont(1):               # verbose continue
685             print('\t** Continue seems to have failed!  Probly lost subproc...')
686             return p
687         else:
688             print('\tReading accumulated line, blocking read:')
689             print(p.readline())
690             print("\tDeleting subproc, which was set to die noisily:")
691             del p
692             print("\tDone.")
693             return None
694
695 if __name__ == '__main__':
696     test()