]> git.phdru.name Git - bookmarks_db.git/blob - subproc.py
Doc(subproc.py): modified by phd
[bookmarks_db.git] / subproc.py
1 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
2
3 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
4 routines.
5
6 Subprocess class features:
7
8  - provides non-blocking stdin and stderr reads
9
10  - provides subprocess stop and continue, kill-on-deletion
11
12  - provides detection of subprocess startup failure
13
14  - Subprocess objects have nice, informative string rep (as every good object
15    ought)."""
16
17 __version__ = "Revision: 1.15 "
18
19 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
20 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
21
22 # Prior art: Initially based python code examples demonstrating usage of pipes
23 #            and subprocesses, primarily one by jose pereira.
24
25 # Implementation notes:
26 # - I'm not using the fcntl module to implement non-blocking file descriptors,
27 #   because i don't know what all in it is portable and what is not.  I'm not
28 #   about to provide for different platform contingencies - at that extent, the
29 #   effort would be better spent hacking 'expect' into python.
30 # - Todo? - Incorporate an error-output handler approach, where error output is
31 #           checked on regular IO, when a handler is defined, and passed to the
32 #           handler (eg for printing) immediately as it shows...
33 # - Detection of failed subprocess startup is a gross kludge, at present.
34
35 # - new additions (1.3, 1.4):
36 #  - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
37 #    reads based solely on os.read with select, while capitalizing big-time on
38 #    multi-char read chunking.
39 #  - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
40 #
41 # ken.manheimer@nist.gov
42
43 # This is a modified version by Oleg Broytman <phd@phdru.name>.
44 # The original version is still preserved at
45 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
46
47 import sys, os, string, time, types
48 import select
49 import signal
50
51
52 SubprocessError = 'SubprocessError'
53 # You may need to increase execvp_grace_seconds, if you have a large or slow
54 # path to search:
55 execvp_grace_seconds = 0.5
56
57 class Subprocess:
58     """Run and communicate asynchronously with a subprocess.
59
60     Provides non-blocking reads in the form of .readPendingChars and
61     .readPendingLine.
62
63     .readline will block until it gets a complete line.
64
65     .peekPendingChar does a non-blocking, non-consuming read for pending
66     output, and can be used before .readline to check non-destructively for
67     pending output.  .waitForPendingChar(timeout, pollPause=.1) blocks until
68     a new character is pending, or timeout secs pass, with granularity of
69     pollPause seconds.
70
71     There are corresponding read and peekPendingErrXXX routines, to read from
72     the subprocess stderr stream."""
73
74     pid = 0
75     cmd = ''
76     expire_noisily = 1                  # Announce subproc destruction?
77     pipefiles = []
78     readbuf = 0                         # fork will assign to be a readbuf obj
79     errbuf = 0                          # fork will assign to be a readbuf obj
80
81     def __init__(self, cmd, control_stderr=0, expire_noisily=0,
82                  in_fd=0, out_fd=1, err_fd=2):
83         """Launch a subprocess, given command string COMMAND."""
84         self.cmd = cmd
85         self.pid = 0
86         self.expire_noisily = expire_noisily
87         self.control_stderr = control_stderr
88         self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
89         self.fork()
90
91     def fork(self, cmd=None):
92         """Fork a subprocess with designated COMMAND (default, self.cmd)."""
93         if cmd: self.cmd = cmd
94         else: cmd = self.cmd
95         cmd = string.split(self.cmd)
96         pRc, cWp = os.pipe()            # parent-read-child, child-write-parent
97         cRp, pWc = os.pipe()            # child-read-parent, parent-write-child
98         pRe, cWe = os.pipe()            # parent-read-error, child-write-error
99         self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
100
101         self.pid = os.fork()
102
103         if self.pid == 0:       #### CHILD ####
104             parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
105             # Reopen stdin, out, err, on pipe ends:
106             os.dup2(cRp, self.in_fd)            # cRp = sys.stdin
107             os.dup2(cWp, self.out_fd)           # cWp = sys.stdout
108             if self.control_stderr:
109                 os.dup2(cWe, self.err_fd)       # cWe = sys.stderr
110             # Ensure (within reason) stray file descriptors are closed:
111             excludes = [self.in_fd, self.out_fd, self.err_fd]
112             for i in range(4,100):
113                 if i not in excludes:
114                     try: os.close(i)
115                     except os.error: pass
116
117             try:
118                 os.execvp(cmd[0], cmd)
119                 os._exit(1)                     # Shouldn't get here
120
121             except os.error as e:
122                 if self.control_stderr:
123                     os.dup2(parentErr, 2)       # Reconnect to parent's stdout
124                 sys.stderr.write("**execvp failed, '%s'**\n" %
125                                  str(e))
126                 os._exit(1)
127             os._exit(1)                 # Shouldn't get here.
128
129         else:           ### PARENT ###
130             # Connect to the child's file descriptors, using our customized
131             # fdopen:
132             self.toChild = os.fdopen(pWc, 'w')
133             self.toChild_fdlist = [pWc]
134             self.readbuf = ReadBuf(pRc)
135             self.errbuf = ReadBuf(pRe)
136             time.sleep(execvp_grace_seconds)
137             try:
138                 pid, err = os.waitpid(self.pid, os.WNOHANG)
139             except os.error as error:
140                 errno, msg = error
141                 if errno == 10:
142                     raise SubprocessError("Subprocess '%s' failed." % self.cmd)
143                 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
144             if pid == self.pid:
145                 # child exited already
146                 self.pid == None
147                 sig = err & 0xff
148                 rc = (err & 0xff00) >> 8
149                 if sig:
150                     raise SubprocessError(
151                     "child killed by signal %d with a return code of %d"
152                     % (sig, rc))
153                 if rc:
154                     raise SubprocessError(
155                           "child exited with return code %d" % rc)
156                 # Child may have exited, but not in error, so we won't say
157                 # anything more at this point.
158
159     ### Write input to subprocess ###
160
161     def write(self, str):
162         """Write a STRING to the subprocess."""
163
164         if not self.pid:
165             raise SubprocessError("no child")                           # ===>
166         if select.select([],self.toChild_fdlist,[],0)[1]:
167             self.toChild.write(str)
168             self.toChild.flush()
169         else:
170             # XXX Can write-buffer full be handled better??
171             raise IOError("write to %s blocked" % self)                 # ===>
172
173     def writeline(self, line=''):
174         """Write STRING, with added newline termination, to subprocess."""
175         self.write(line + '\n')
176
177     ### Get output from subprocess ###
178
179     def peekPendingChar(self):
180         """Return, but (effectively) do not consume a single pending output
181         char, or return null string if none pending."""
182
183         return self.readbuf.peekPendingChar()                           # ===>
184     def peekPendingErrChar(self):
185         """Return, but (effectively) do not consume a single pending output
186         char, or return null string if none pending."""
187
188         return self.errbuf.peekPendingChar()                            # ===>
189
190     def waitForPendingChar(self, timeout, pollPause=.1):
191         """Block max TIMEOUT secs until we peek a pending char, returning the
192         char, or '' if none encountered.
193
194         Pause POLLPAUSE secs (default .1) between polls."""
195
196         accume = 0
197         while 1:
198             nextChar = self.readbuf.peekPendingChar()
199             if nextChar or (accume > timeout): return nextChar
200             time.sleep(pollPause)
201             accume = accume + pollPause
202
203     def read(self, n=None):
204         """Read N chars, or all pending if no N specified."""
205         if n == None:
206             return self.readPendingChars()
207         got = ''
208         while n:
209             got0 = self.readPendingChars(n)
210             got = got + got0
211             n = n - len(got0)
212         return got
213     def readPendingChars(self, max=None):
214         """Read all currently pending subprocess output as a single string."""
215         return self.readbuf.readPendingChars(max)
216     def readPendingErrChars(self):
217         """Read all currently pending subprocess error output as a single
218         string."""
219         if self.control_stderr:
220             return self.errbuf.readPendingChars()
221         else:
222             raise SubprocessError("Haven't grabbed subprocess error stream.")
223
224     def readPendingLine(self):
225         """Read currently pending subprocess output, up to a complete line
226         (newline inclusive)."""
227         return self.readbuf.readPendingLine()
228     def readPendingErrLine(self):
229         """Read currently pending subprocess error output, up to a complete
230         line (newline inclusive)."""
231         if self.control_stderr:
232             return self.errbuf.readPendingLine()
233         else:
234             raise SubprocessError("Haven't grabbed subprocess error stream.")
235
236     def readline(self):
237         """Return next complete line of subprocess output, blocking until
238         then."""
239         return self.readbuf.readline()
240     def readlineErr(self):
241         """Return next complete line of subprocess error output, blocking until
242         then."""
243         if self.control_stderr:
244             return self.errbuf.readline()
245         else:
246             raise SubprocessError("Haven't grabbed subprocess error stream.")
247
248     ### Subprocess Control ###
249
250     def active(self):
251         """True if subprocess is alive and kicking."""
252         return self.status(boolean=1)
253     def status(self, boolean=0):
254         """Return string indicating whether process is alive or dead."""
255         active = 0
256         if not self.cmd:
257             status = 'sans command'
258         elif not self.pid:
259             status = 'sans process'
260         elif not self.cont():
261             status = "(unresponding) '%s'" % self.cmd
262         else:
263             status = "'%s'" % self.cmd
264             active = 1
265         if boolean:
266             return active
267         else:
268             return status
269
270     def stop(self, verbose=1):
271         """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
272         otherwise."""
273         try:
274             os.kill(self.pid, signal.SIGSTOP)
275         except os.error:
276             if verbose:
277                 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
278             return 0
279         if verbose: print("Stopped '%s'" % self.cmd)
280         return 'stopped'
281
282     def cont(self, verbose=0):
283         """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
284         otherwise."""
285         try:
286             os.kill(self.pid, signal.SIGCONT)
287         except os.error:
288             if verbose:
289                 print(("Continue failed for '%s' - '%s'" %
290                        (self.cmd, sys.exc_value)))
291             return 0
292         if verbose: print("Continued '%s'" % self.cmd)
293         return 'continued'
294
295     def die(self):
296         """Send process PID signal SIG (default 9, 'kill'), returning None once
297          it is successfully reaped.
298
299         SubprocessError is raised if process is not successfully killed."""
300
301         if not self.pid:
302             raise SubprocessError("No process")                         # ===>
303         elif not self.cont():
304             raise SubprocessError("Can't signal subproc %s" % self)     # ===>
305
306         # Try sending first a TERM and then a KILL signal.
307         keep_trying = 1
308         sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
309         for sig in sigs:
310             try:
311                 os.kill(self.pid, sig[1])
312             except posix.error:
313                 # keep trying
314                 pass
315             # Try a couple or three times to reap the process with waitpid:
316             for i in range(5):
317                 # WNOHANG == 1 on sunos, presumably same elsewhere.
318                 if os.waitpid(self.pid, os.WNOHANG):
319                     if self.expire_noisily:
320                         print(("\n(%s subproc %d '%s' / %s)" %
321                                (sig[0], self.pid, self.cmd,
322                                 hex(id(self))[2:])))
323                     for i in self.pipefiles:
324                         os.close(i)
325                     self.pid = 0
326                     return None                                         # ===>
327                 time.sleep(.1)
328         # Only got here if subprocess is not gone:
329         raise SubprocessError(
330                "Failed kill of subproc %d, '%s', with signals %s" %
331                 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
332
333     def __del__(self):
334         """Terminate the subprocess"""
335         if self.pid:
336             self.die()
337
338     def __repr__(self):
339         status = self.status()
340         return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
341
342 #############################################################################
343 #####                 Non-blocking read operations                      #####
344 #############################################################################
345
346 class ReadBuf:
347     """Output buffer for non-blocking reads on selectable files like pipes and
348     sockets.  Init with a file descriptor for the file."""
349
350     def __init__(self, fd, maxChunkSize=1024):
351         """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
352         (default 1024)."""
353
354         if fd < 0:
355             raise ValueError
356         self.fd = fd
357         self.eof = 0                    # May be set with stuff still in .buf
358         self.buf = ''
359         self.chunkSize = maxChunkSize   # Biggest read chunk, default 1024.
360
361     def fileno(self):
362         return self.fd
363
364     def peekPendingChar(self):
365         """Return, but don't consume, first character of unconsumed output from
366         file, or empty string if none."""
367
368         if self.buf:
369             return self.buf[0]                                          # ===>
370
371         if self.eof:
372             return ''                                                   # ===>
373
374         sel = select.select([self.fd], [], [self.fd], 0)
375         if sel[2]:
376             self.eof = 1
377         if sel[0]:
378             self.buf = os.read(self.fd, self.chunkSize)                 # ===>
379             return self.buf[0]          # Assume select don't lie.
380         else: return ''                                                 # ===>
381
382
383     def readPendingChar(self):
384         """Consume first character of unconsumed output from file, or empty
385         string if none."""
386
387         if self.buf:
388             got, self.buf = self.buf[0], self.buf[1:]
389             return got                                                  # ===>
390
391         if self.eof:
392             return ''                                                   # ===>
393
394         sel = select.select([self.fd], [], [self.fd], 0)
395         if sel[2]:
396             self.eof = 1
397         if sel[0]:
398             return os.read(self.fd, 1)                                  # ===>
399         else: return ''                                                 # ===>
400
401     def readPendingChars(self, max=None):
402         """Consume uncomsumed output from FILE, or empty string if nothing
403         pending."""
404
405         got = ""
406         if self.buf:
407             if (max > 0) and (len(self.buf) > max):
408                 got = self.buf[0:max]
409                 self.buf = self.buf[max:]
410             else:
411                 got, self.buf = self.buf, ''
412             return got
413
414         if self.eof:
415             return ''
416
417         sel = select.select([self.fd], [], [self.fd], 0)
418         if sel[2]:
419             self.eof = 1
420         if sel[0]:
421             got = got + os.read(self.fd, self.chunkSize)
422             if max == 0:
423                 self.buf = got
424                 return ''
425             elif max == None:
426                 return got
427             elif len(got) > max:
428                 self.buf = self.buf + got[max:]
429                 return got[:max]
430             else:
431                 return got
432         else: return ''
433
434     def readPendingLine(self, block=0):
435         """Return pending output from FILE, up to first newline (inclusive).
436
437         Does not block unless optional arg BLOCK is true.
438
439         Note that an error will be raised if a new eof is encountered without
440         any newline."""
441
442         if self.buf:
443             to = string.find(self.buf, '\n')
444             if to != -1:
445                 got, self.buf = self.buf[:to+1], self.buf[to+1:]
446                 return got                                              # ===>
447             got, self.buf = self.buf, ''
448         else:
449             if self.eof:
450                 return ''                                               # ===>
451             got = ''
452
453         # Herein, 'got' contains the (former) contents of the buffer, and it
454         # doesn't include a newline.
455         fdlist = [self.fd]
456         period = block and 1.0 or 0     # don't be too busy while waiting
457         while 1:                        # (we'll only loop if block set)
458             sel = select.select(fdlist, [], fdlist, period)
459             if sel[2]:
460                 self.eof = 1
461             if sel[0]:
462                 got = got + os.read(self.fd, self.chunkSize)
463
464             to = string.find(got, '\n')
465             if to != -1:
466                 got, self.buf = got[:to+1], got[to+1:]
467                 return got                                              # ===>
468             if not block:
469                 return got                                              # ===>
470             if self.eof:
471                 self.buf = ''           # this is how an ordinary file acts...
472                 return got
473             # otherwise - no newline, blocking requested, no eof - loop. # ==^
474
475     def readline(self):
476         """Return next output line from file, blocking until it is received."""
477
478         return self.readPendingLine(1)                                  # ===>
479
480
481 #############################################################################
482 #####                 Encapsulated reading and writing                  #####
483 #############################################################################
484 # Encapsulate messages so the end can be unambiguously identified, even
485 # when they contain multiple, possibly empty lines.
486
487 class RecordFile:
488     """Encapsulate stream object for record-oriented IO.
489
490     Particularly useful when dealing with non-line oriented communications
491     over pipes, eg with subprocesses."""
492
493     # Message is written preceded by a line containing the message length.
494
495     def __init__(self, f):
496         self.file = f
497
498     def write_record(self, s):
499         "Write so self.read knows exactly how much to read."
500         f = self.__dict__['file']
501         f.write("%s\n%s" % (len(s), s))
502         if hasattr(f, 'flush'):
503             f.flush()
504
505     def read_record(self):
506         "Read and reconstruct message as prepared by self.write."
507         f = self.__dict__['file']
508         line = f.readline()[:-1]
509         if line:
510             try:
511                 l = string.atoi(line)
512             except ValueError:
513                 raise IOError(("corrupt %s file structure"
514                                 % self.__class__.__name__))
515             return f.read(l)
516         else:
517             # EOF.
518             return ''
519
520     def __getattr__(self, attr):
521         """Implement characteristic file object attributes."""
522         f = self.__dict__['file']
523         if hasattr(f, attr):
524             return getattr(f, attr)
525         else:
526             raise AttributeError(attr)
527
528     def __repr__(self):
529         return "<%s of %s at %s>" % (self.__class__.__name__,
530                                      self.__dict__['file'],
531                                      hex(id(self))[2:])
532
533 def record_trial(s):
534     """Exercise encapsulated write/read with an arbitrary string.
535
536     Raise IOError if the string gets distorted through transmission!"""
537     from StringIO import StringIO
538     sf = StringIO()
539     c = RecordFile(sf)
540     c.write(s)
541     c.seek(0)
542     r = c.read()
543     show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
544     if r != s:
545         raise IOError("String distorted:\n%s" % show)
546
547 #############################################################################
548 #####                   An example subprocess interfaces                #####
549 #############################################################################
550
551 class Ph:
552     """Convenient interface to CCSO 'ph' nameserver subprocess.
553
554     .query('string...') method takes a query and returns a list of dicts, each
555     of which represents one entry."""
556
557     # Note that i made this a class that handles a subprocess object, rather
558     # than one that inherits from it.  I didn't see any functional
559     # disadvantages, and didn't think that full support of the entire
560     # Subprocess functionality was in any way suitable for interaction with
561     # this specialized interface.  ?  klm 13-Jan-1995
562
563     def __init__(self):
564         try:
565             self.proc = Subprocess('ph', 1)
566         except:
567             raise SubprocessError('failure starting ph: %s' %         # ===>
568                                     str(sys.exc_value))
569
570     def query(self, q):
571         """Send a query and return a list of dicts for responses.
572
573         Raise a ValueError if ph responds with an error."""
574
575         self.clear()
576
577         self.proc.writeline('query ' + q)
578         got = []; it = {}
579         while 1:
580             response = self.getreply()  # Should get null on new prompt.
581             errs = self.proc.readPendingErrChars()
582             if errs:
583                 sys.stderr.write(errs)
584             if it:
585                 got.append(it)
586                 it = {}
587             if not response:
588                 return got                                              # ===>
589             elif type(response) == types.StringType:
590                 raise ValueError("ph failed match: '%s'" % response)    # ===>
591             for line in response:
592                 # convert to a dict:
593                 line = string.splitfields(line, ':')
594                 it[string.strip(line[0])] = (
595                     string.strip(string.join(line[1:])))
596
597     def getreply(self):
598         """Consume next response from ph, returning list of lines or string
599         err."""
600         # Key on first char:  (First line may lack newline.)
601         #  - dash       discard line
602         #  - 'ph> '     conclusion of response
603         #  - number     error message
604         #  - whitespace beginning of next response
605
606         nextChar = self.proc.waitForPendingChar(60)
607         if not nextChar:
608             raise SubprocessError('ph subprocess not responding')       # ===>
609         elif nextChar == '-':
610             # dashed line - discard it, and continue reading:
611             self.proc.readline()
612             return self.getreply()                                      # ===>
613         elif nextChar == 'p':
614             # 'ph> ' prompt - don't think we should hit this, but what the hay:
615             return ''                                                   # ===>
616         elif nextChar in '0123456789':
617             # Error notice - we're currently assuming single line errors:
618             return self.proc.readline()[:-1]                            # ===>
619         elif nextChar in ' \t':
620             # Get content, up to next dashed line:
621             got = []
622             while nextChar != '-' and nextChar != '':
623                 got.append(self.proc.readline()[:-1])
624                 nextChar = self.proc.peekPendingChar()
625             return got
626     def __repr__(self):
627         return "<Ph instance, %s at %s>\n" % (self.proc.status(),
628                                              hex(id(self))[2:])
629     def clear(self):
630         """Clear-out initial preface or residual subproc input and output."""
631         pause = .5; maxIter = 10        # 5 seconds to clear
632         iterations = 0
633         got = ''
634         self.proc.write('')
635         while iterations < maxIter:
636             got = got + self.proc.readPendingChars()
637             # Strip out all but the last incomplete line:
638             got = string.splitfields(got, '\n')[-1]
639             if got == 'ph> ': return    # Ok.                             ===>
640             time.sleep(pause)
641         raise SubprocessError('ph not responding within %s secs' %
642                                 pause * maxIter)
643
644 #############################################################################
645 #####                             Test                                  #####
646 #############################################################################
647
648 def test(p=0):
649     print("\tOpening subprocess:")
650     p = Subprocess('cat', 1)            # set to expire noisily...
651     print(p)
652     print("\tOpening bogus subprocess, should fail:")
653     try:
654         b = Subprocess('/', 1)
655         print("\tOops!  Null-named subprocess startup *succeeded*?!?")
656     except SubprocessError:
657         print("\t...yep, it failed.")
658     print('\tWrite, then read, two newline-teriminated lines, using readline:')
659     p.write('first full line written\n'); p.write('second.\n')
660     print(repr(p.readline()))
661     print(repr(p.readline()))
662     print('\tThree lines, last sans newline, read using combination:')
663     p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
664     print('\tFirst line via readline:')
665     print(repr(p.readline()))
666     print('\tRest via readPendingChars:')
667     print(p.readPendingChars())
668     print("\tStopping then continuing subprocess (verbose):")
669     if not p.stop(1):                   # verbose stop
670         print('\t** Stop seems to have failed!')
671     else:
672         print('\tWriting line while subprocess is paused...')
673         p.write('written while subprocess paused\n')
674         print('\tNonblocking read of paused subprocess (should be empty):')
675         print(p.readPendingChars())
676         print('\tContinuing subprocess (verbose):')
677         if not p.cont(1):               # verbose continue
678             print('\t** Continue seems to have failed!  Probly lost subproc...')
679             return p
680         else:
681             print('\tReading accumulated line, blocking read:')
682             print(p.readline())
683             print("\tDeleting subproc, which was set to die noisily:")
684             del p
685             print("\tDone.")
686             return None