]> git.phdru.name Git - bookmarks_db.git/blob - subproc.py
Feat(subproc.py): Run tests
[bookmarks_db.git] / subproc.py
1 #! /usr/bin/env python
2
3 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
4
5 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
6 routines.
7
8 Subprocess class features:
9
10  - provides non-blocking stdin and stderr reads
11
12  - provides subprocess stop and continue, kill-on-deletion
13
14  - provides detection of subprocess startup failure
15
16  - Subprocess objects have nice, informative string rep (as every good object
17    ought)."""
18
19 __version__ = "Revision: 1.15 "
20
21 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
22 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
23
24 # Prior art: Initially based python code examples demonstrating usage of pipes
25 #            and subprocesses, primarily one by jose pereira.
26
27 # Implementation notes:
28 # - I'm not using the fcntl module to implement non-blocking file descriptors,
29 #   because i don't know what all in it is portable and what is not.  I'm not
30 #   about to provide for different platform contingencies - at that extent, the
31 #   effort would be better spent hacking 'expect' into python.
32 # - Todo? - Incorporate an error-output handler approach, where error output is
33 #           checked on regular IO, when a handler is defined, and passed to the
34 #           handler (eg for printing) immediately as it shows...
35 # - Detection of failed subprocess startup is a gross kludge, at present.
36
37 # - new additions (1.3, 1.4):
38 #  - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
39 #    reads based solely on os.read with select, while capitalizing big-time on
40 #    multi-char read chunking.
41 #  - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
42 #
43 # ken.manheimer@nist.gov
44
45 # This is a modified version by Oleg Broytman <phd@phdru.name>.
46 # The original version is still preserved at
47 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
48
49 import sys, os, string, time, types
50 import select
51 import signal
52
53
54 class SubprocessError(Exception):
55     pass
56
57 # You may need to increase execvp_grace_seconds, if you have a large or slow
58 # path to search:
59 execvp_grace_seconds = 0.5
60
61 class Subprocess:
62     """Run and communicate asynchronously with a subprocess.
63
64     Provides non-blocking reads in the form of .readPendingChars and
65     .readPendingLine.
66
67     .readline will block until it gets a complete line.
68
69     .peekPendingChar does a non-blocking, non-consuming read for pending
70     output, and can be used before .readline to check non-destructively for
71     pending output.  .waitForPendingChar(timeout, pollPause=.1) blocks until
72     a new character is pending, or timeout secs pass, with granularity of
73     pollPause seconds.
74
75     There are corresponding read and peekPendingErrXXX routines, to read from
76     the subprocess stderr stream."""
77
78     pid = 0
79     cmd = ''
80     expire_noisily = 1                  # Announce subproc destruction?
81     pipefiles = []
82     readbuf = 0                         # fork will assign to be a readbuf obj
83     errbuf = 0                          # fork will assign to be a readbuf obj
84
85     def __init__(self, cmd, control_stderr=0, expire_noisily=0,
86                  in_fd=0, out_fd=1, err_fd=2):
87         """Launch a subprocess, given command string COMMAND."""
88         self.cmd = cmd
89         self.pid = 0
90         self.expire_noisily = expire_noisily
91         self.control_stderr = control_stderr
92         self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
93         self.fork()
94
95     def fork(self, cmd=None):
96         """Fork a subprocess with designated COMMAND (default, self.cmd)."""
97         if cmd: self.cmd = cmd
98         else: cmd = self.cmd
99         cmd = string.split(self.cmd)
100         pRc, cWp = os.pipe()            # parent-read-child, child-write-parent
101         cRp, pWc = os.pipe()            # child-read-parent, parent-write-child
102         pRe, cWe = os.pipe()            # parent-read-error, child-write-error
103         self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
104
105         self.pid = os.fork()
106
107         if self.pid == 0:       #### CHILD ####
108             parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
109             # Reopen stdin, out, err, on pipe ends:
110             os.dup2(cRp, self.in_fd)            # cRp = sys.stdin
111             os.dup2(cWp, self.out_fd)           # cWp = sys.stdout
112             if self.control_stderr:
113                 os.dup2(cWe, self.err_fd)       # cWe = sys.stderr
114             # Ensure (within reason) stray file descriptors are closed:
115             excludes = [self.in_fd, self.out_fd, self.err_fd]
116             for i in range(4,100):
117                 if i not in excludes:
118                     try: os.close(i)
119                     except os.error: pass
120
121             try:
122                 os.execvp(cmd[0], cmd)
123                 os._exit(1)                     # Shouldn't get here
124
125             except os.error as e:
126                 if self.control_stderr:
127                     os.dup2(parentErr, 2)       # Reconnect to parent's stdout
128                 sys.stderr.write("**execvp failed, '%s'**\n" %
129                                  str(e))
130                 os._exit(1)
131             os._exit(1)                 # Shouldn't get here.
132
133         else:           ### PARENT ###
134             # Connect to the child's file descriptors, using our customized
135             # fdopen:
136             self.toChild = os.fdopen(pWc, 'w')
137             self.toChild_fdlist = [pWc]
138             self.readbuf = ReadBuf(pRc)
139             self.errbuf = ReadBuf(pRe)
140             time.sleep(execvp_grace_seconds)
141             try:
142                 pid, err = os.waitpid(self.pid, os.WNOHANG)
143             except os.error as error:
144                 errno, msg = error
145                 if errno == 10:
146                     raise SubprocessError("Subprocess '%s' failed." % self.cmd)
147                 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
148             if pid == self.pid:
149                 # child exited already
150                 self.pid == None
151                 sig = err & 0xff
152                 rc = (err & 0xff00) >> 8
153                 if sig:
154                     raise SubprocessError(
155                     "child killed by signal %d with a return code of %d"
156                     % (sig, rc))
157                 if rc:
158                     raise SubprocessError(
159                           "child exited with return code %d" % rc)
160                 # Child may have exited, but not in error, so we won't say
161                 # anything more at this point.
162
163     ### Write input to subprocess ###
164
165     def write(self, str):
166         """Write a STRING to the subprocess."""
167
168         if not self.pid:
169             raise SubprocessError("no child")                           # ===>
170         if select.select([],self.toChild_fdlist,[],0)[1]:
171             self.toChild.write(str)
172             self.toChild.flush()
173         else:
174             # XXX Can write-buffer full be handled better??
175             raise IOError("write to %s blocked" % self)                 # ===>
176
177     def writeline(self, line=''):
178         """Write STRING, with added newline termination, to subprocess."""
179         self.write(line + '\n')
180
181     ### Get output from subprocess ###
182
183     def peekPendingChar(self):
184         """Return, but (effectively) do not consume a single pending output
185         char, or return null string if none pending."""
186
187         return self.readbuf.peekPendingChar()                           # ===>
188     def peekPendingErrChar(self):
189         """Return, but (effectively) do not consume a single pending output
190         char, or return null string if none pending."""
191
192         return self.errbuf.peekPendingChar()                            # ===>
193
194     def waitForPendingChar(self, timeout, pollPause=.1):
195         """Block max TIMEOUT secs until we peek a pending char, returning the
196         char, or '' if none encountered.
197
198         Pause POLLPAUSE secs (default .1) between polls."""
199
200         accume = 0
201         while 1:
202             nextChar = self.readbuf.peekPendingChar()
203             if nextChar or (accume > timeout): return nextChar
204             time.sleep(pollPause)
205             accume = accume + pollPause
206
207     def read(self, n=None):
208         """Read N chars, or all pending if no N specified."""
209         if n == None:
210             return self.readPendingChars()
211         got = ''
212         while n:
213             got0 = self.readPendingChars(n)
214             got = got + got0
215             n = n - len(got0)
216         return got
217     def readPendingChars(self, max=None):
218         """Read all currently pending subprocess output as a single string."""
219         return self.readbuf.readPendingChars(max)
220     def readPendingErrChars(self):
221         """Read all currently pending subprocess error output as a single
222         string."""
223         if self.control_stderr:
224             return self.errbuf.readPendingChars()
225         else:
226             raise SubprocessError("Haven't grabbed subprocess error stream.")
227
228     def readPendingLine(self):
229         """Read currently pending subprocess output, up to a complete line
230         (newline inclusive)."""
231         return self.readbuf.readPendingLine()
232     def readPendingErrLine(self):
233         """Read currently pending subprocess error output, up to a complete
234         line (newline inclusive)."""
235         if self.control_stderr:
236             return self.errbuf.readPendingLine()
237         else:
238             raise SubprocessError("Haven't grabbed subprocess error stream.")
239
240     def readline(self):
241         """Return next complete line of subprocess output, blocking until
242         then."""
243         return self.readbuf.readline()
244     def readlineErr(self):
245         """Return next complete line of subprocess error output, blocking until
246         then."""
247         if self.control_stderr:
248             return self.errbuf.readline()
249         else:
250             raise SubprocessError("Haven't grabbed subprocess error stream.")
251
252     ### Subprocess Control ###
253
254     def active(self):
255         """True if subprocess is alive and kicking."""
256         return self.status(boolean=1)
257     def status(self, boolean=0):
258         """Return string indicating whether process is alive or dead."""
259         active = 0
260         if not self.cmd:
261             status = 'sans command'
262         elif not self.pid:
263             status = 'sans process'
264         elif not self.cont():
265             status = "(unresponding) '%s'" % self.cmd
266         else:
267             status = "'%s'" % self.cmd
268             active = 1
269         if boolean:
270             return active
271         else:
272             return status
273
274     def stop(self, verbose=1):
275         """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
276         otherwise."""
277         try:
278             os.kill(self.pid, signal.SIGSTOP)
279         except os.error:
280             if verbose:
281                 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
282             return 0
283         if verbose: print("Stopped '%s'" % self.cmd)
284         return 'stopped'
285
286     def cont(self, verbose=0):
287         """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
288         otherwise."""
289         try:
290             os.kill(self.pid, signal.SIGCONT)
291         except os.error:
292             if verbose:
293                 print(("Continue failed for '%s' - '%s'" %
294                        (self.cmd, sys.exc_value)))
295             return 0
296         if verbose: print("Continued '%s'" % self.cmd)
297         return 'continued'
298
299     def die(self):
300         """Send process PID signal SIG (default 9, 'kill'), returning None once
301          it is successfully reaped.
302
303         SubprocessError is raised if process is not successfully killed."""
304
305         if not self.pid:
306             raise SubprocessError("No process")                         # ===>
307         elif not self.cont():
308             raise SubprocessError("Can't signal subproc %s" % self)     # ===>
309
310         # Try sending first a TERM and then a KILL signal.
311         keep_trying = 1
312         sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
313         for sig in sigs:
314             try:
315                 os.kill(self.pid, sig[1])
316             except posix.error:
317                 # keep trying
318                 pass
319             # Try a couple or three times to reap the process with waitpid:
320             for i in range(5):
321                 # WNOHANG == 1 on sunos, presumably same elsewhere.
322                 if os.waitpid(self.pid, os.WNOHANG):
323                     if self.expire_noisily:
324                         print(("\n(%s subproc %d '%s' / %s)" %
325                                (sig[0], self.pid, self.cmd,
326                                 hex(id(self))[2:])))
327                     for i in self.pipefiles:
328                         os.close(i)
329                     self.pid = 0
330                     return None                                         # ===>
331                 time.sleep(.1)
332         # Only got here if subprocess is not gone:
333         raise SubprocessError(
334                "Failed kill of subproc %d, '%s', with signals %s" %
335                 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
336
337     def __del__(self):
338         """Terminate the subprocess"""
339         if self.pid:
340             self.die()
341
342     def __repr__(self):
343         status = self.status()
344         return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
345
346 #############################################################################
347 #####                 Non-blocking read operations                      #####
348 #############################################################################
349
350 class ReadBuf:
351     """Output buffer for non-blocking reads on selectable files like pipes and
352     sockets.  Init with a file descriptor for the file."""
353
354     def __init__(self, fd, maxChunkSize=1024):
355         """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
356         (default 1024)."""
357
358         if fd < 0:
359             raise ValueError
360         self.fd = fd
361         self.eof = 0                    # May be set with stuff still in .buf
362         self.buf = ''
363         self.chunkSize = maxChunkSize   # Biggest read chunk, default 1024.
364
365     def fileno(self):
366         return self.fd
367
368     def peekPendingChar(self):
369         """Return, but don't consume, first character of unconsumed output from
370         file, or empty string if none."""
371
372         if self.buf:
373             return self.buf[0]                                          # ===>
374
375         if self.eof:
376             return ''                                                   # ===>
377
378         sel = select.select([self.fd], [], [self.fd], 0)
379         if sel[2]:
380             self.eof = 1
381         if sel[0]:
382             self.buf = os.read(self.fd, self.chunkSize)                 # ===>
383             return self.buf[0]          # Assume select don't lie.
384         else: return ''                                                 # ===>
385
386
387     def readPendingChar(self):
388         """Consume first character of unconsumed output from file, or empty
389         string if none."""
390
391         if self.buf:
392             got, self.buf = self.buf[0], self.buf[1:]
393             return got                                                  # ===>
394
395         if self.eof:
396             return ''                                                   # ===>
397
398         sel = select.select([self.fd], [], [self.fd], 0)
399         if sel[2]:
400             self.eof = 1
401         if sel[0]:
402             return os.read(self.fd, 1)                                  # ===>
403         else: return ''                                                 # ===>
404
405     def readPendingChars(self, max=None):
406         """Consume uncomsumed output from FILE, or empty string if nothing
407         pending."""
408
409         got = ""
410         if self.buf:
411             if (max > 0) and (len(self.buf) > max):
412                 got = self.buf[0:max]
413                 self.buf = self.buf[max:]
414             else:
415                 got, self.buf = self.buf, ''
416             return got
417
418         if self.eof:
419             return ''
420
421         sel = select.select([self.fd], [], [self.fd], 0)
422         if sel[2]:
423             self.eof = 1
424         if sel[0]:
425             got = got + os.read(self.fd, self.chunkSize)
426             if max == 0:
427                 self.buf = got
428                 return ''
429             elif max == None:
430                 return got
431             elif len(got) > max:
432                 self.buf = self.buf + got[max:]
433                 return got[:max]
434             else:
435                 return got
436         else: return ''
437
438     def readPendingLine(self, block=0):
439         """Return pending output from FILE, up to first newline (inclusive).
440
441         Does not block unless optional arg BLOCK is true.
442
443         Note that an error will be raised if a new eof is encountered without
444         any newline."""
445
446         if self.buf:
447             to = string.find(self.buf, '\n')
448             if to != -1:
449                 got, self.buf = self.buf[:to+1], self.buf[to+1:]
450                 return got                                              # ===>
451             got, self.buf = self.buf, ''
452         else:
453             if self.eof:
454                 return ''                                               # ===>
455             got = ''
456
457         # Herein, 'got' contains the (former) contents of the buffer, and it
458         # doesn't include a newline.
459         fdlist = [self.fd]
460         period = block and 1.0 or 0     # don't be too busy while waiting
461         while 1:                        # (we'll only loop if block set)
462             sel = select.select(fdlist, [], fdlist, period)
463             if sel[2]:
464                 self.eof = 1
465             if sel[0]:
466                 got = got + os.read(self.fd, self.chunkSize)
467
468             to = string.find(got, '\n')
469             if to != -1:
470                 got, self.buf = got[:to+1], got[to+1:]
471                 return got                                              # ===>
472             if not block:
473                 return got                                              # ===>
474             if self.eof:
475                 self.buf = ''           # this is how an ordinary file acts...
476                 return got
477             # otherwise - no newline, blocking requested, no eof - loop. # ==^
478
479     def readline(self):
480         """Return next output line from file, blocking until it is received."""
481
482         return self.readPendingLine(1)                                  # ===>
483
484
485 #############################################################################
486 #####                 Encapsulated reading and writing                  #####
487 #############################################################################
488 # Encapsulate messages so the end can be unambiguously identified, even
489 # when they contain multiple, possibly empty lines.
490
491 class RecordFile:
492     """Encapsulate stream object for record-oriented IO.
493
494     Particularly useful when dealing with non-line oriented communications
495     over pipes, eg with subprocesses."""
496
497     # Message is written preceded by a line containing the message length.
498
499     def __init__(self, f):
500         self.file = f
501
502     def write_record(self, s):
503         "Write so self.read knows exactly how much to read."
504         f = self.__dict__['file']
505         f.write("%s\n%s" % (len(s), s))
506         if hasattr(f, 'flush'):
507             f.flush()
508
509     def read_record(self):
510         "Read and reconstruct message as prepared by self.write."
511         f = self.__dict__['file']
512         line = f.readline()[:-1]
513         if line:
514             try:
515                 l = string.atoi(line)
516             except ValueError:
517                 raise IOError(("corrupt %s file structure"
518                                 % self.__class__.__name__))
519             return f.read(l)
520         else:
521             # EOF.
522             return ''
523
524     def __getattr__(self, attr):
525         """Implement characteristic file object attributes."""
526         f = self.__dict__['file']
527         if hasattr(f, attr):
528             return getattr(f, attr)
529         else:
530             raise AttributeError(attr)
531
532     def __repr__(self):
533         return "<%s of %s at %s>" % (self.__class__.__name__,
534                                      self.__dict__['file'],
535                                      hex(id(self))[2:])
536
537 def record_trial(s):
538     """Exercise encapsulated write/read with an arbitrary string.
539
540     Raise IOError if the string gets distorted through transmission!"""
541     from StringIO import StringIO
542     sf = StringIO()
543     c = RecordFile(sf)
544     c.write(s)
545     c.seek(0)
546     r = c.read()
547     show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
548     if r != s:
549         raise IOError("String distorted:\n%s" % show)
550
551 #############################################################################
552 #####                   An example subprocess interfaces                #####
553 #############################################################################
554
555 class Ph:
556     """Convenient interface to CCSO 'ph' nameserver subprocess.
557
558     .query('string...') method takes a query and returns a list of dicts, each
559     of which represents one entry."""
560
561     # Note that i made this a class that handles a subprocess object, rather
562     # than one that inherits from it.  I didn't see any functional
563     # disadvantages, and didn't think that full support of the entire
564     # Subprocess functionality was in any way suitable for interaction with
565     # this specialized interface.  ?  klm 13-Jan-1995
566
567     def __init__(self):
568         try:
569             self.proc = Subprocess('ph', 1)
570         except:
571             raise SubprocessError('failure starting ph: %s' %         # ===>
572                                     str(sys.exc_value))
573
574     def query(self, q):
575         """Send a query and return a list of dicts for responses.
576
577         Raise a ValueError if ph responds with an error."""
578
579         self.clear()
580
581         self.proc.writeline('query ' + q)
582         got = []; it = {}
583         while 1:
584             response = self.getreply()  # Should get null on new prompt.
585             errs = self.proc.readPendingErrChars()
586             if errs:
587                 sys.stderr.write(errs)
588             if it:
589                 got.append(it)
590                 it = {}
591             if not response:
592                 return got                                              # ===>
593             elif type(response) == types.StringType:
594                 raise ValueError("ph failed match: '%s'" % response)    # ===>
595             for line in response:
596                 # convert to a dict:
597                 line = string.splitfields(line, ':')
598                 it[string.strip(line[0])] = (
599                     string.strip(string.join(line[1:])))
600
601     def getreply(self):
602         """Consume next response from ph, returning list of lines or string
603         err."""
604         # Key on first char:  (First line may lack newline.)
605         #  - dash       discard line
606         #  - 'ph> '     conclusion of response
607         #  - number     error message
608         #  - whitespace beginning of next response
609
610         nextChar = self.proc.waitForPendingChar(60)
611         if not nextChar:
612             raise SubprocessError('ph subprocess not responding')       # ===>
613         elif nextChar == '-':
614             # dashed line - discard it, and continue reading:
615             self.proc.readline()
616             return self.getreply()                                      # ===>
617         elif nextChar == 'p':
618             # 'ph> ' prompt - don't think we should hit this, but what the hay:
619             return ''                                                   # ===>
620         elif nextChar in '0123456789':
621             # Error notice - we're currently assuming single line errors:
622             return self.proc.readline()[:-1]                            # ===>
623         elif nextChar in ' \t':
624             # Get content, up to next dashed line:
625             got = []
626             while nextChar != '-' and nextChar != '':
627                 got.append(self.proc.readline()[:-1])
628                 nextChar = self.proc.peekPendingChar()
629             return got
630     def __repr__(self):
631         return "<Ph instance, %s at %s>\n" % (self.proc.status(),
632                                              hex(id(self))[2:])
633     def clear(self):
634         """Clear-out initial preface or residual subproc input and output."""
635         pause = .5; maxIter = 10        # 5 seconds to clear
636         iterations = 0
637         got = ''
638         self.proc.write('')
639         while iterations < maxIter:
640             got = got + self.proc.readPendingChars()
641             # Strip out all but the last incomplete line:
642             got = string.splitfields(got, '\n')[-1]
643             if got == 'ph> ': return    # Ok.                             ===>
644             time.sleep(pause)
645         raise SubprocessError('ph not responding within %s secs' %
646                                 pause * maxIter)
647
648 #############################################################################
649 #####                             Test                                  #####
650 #############################################################################
651
652 def test(p=0):
653     print("\tOpening bogus subprocess, should fail:")
654     try:
655         b = Subprocess('/', 1)
656         print("\tOops!  Null-named subprocess startup *succeeded*?!?")
657     except SubprocessError:
658         print("\t...yep, it failed.")
659     print("\tOpening cat subprocess:")
660     p = Subprocess('cat', 1)            # set to expire noisily...
661     print(p)
662     print('\tWrite, then read, two newline-teriminated lines, using readline:')
663     p.write('first full line written\n'); p.write('second.\n')
664     print(repr(p.readline()))
665     print(repr(p.readline()))
666     print('\tThree lines, last sans newline, read using combination:')
667     p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
668     print('\tFirst line via readline:')
669     print(repr(p.readline()))
670     print('\tRest via readPendingChars:')
671     print(p.readPendingChars())
672     print("\tStopping then continuing subprocess (verbose):")
673     if not p.stop(1):                   # verbose stop
674         print('\t** Stop seems to have failed!')
675     else:
676         print('\tWriting line while subprocess is paused...')
677         p.write('written while subprocess paused\n')
678         print('\tNonblocking read of paused subprocess (should be empty):')
679         print(p.readPendingChars())
680         print('\tContinuing subprocess (verbose):')
681         if not p.cont(1):               # verbose continue
682             print('\t** Continue seems to have failed!  Probly lost subproc...')
683             return p
684         else:
685             print('\tReading accumulated line, blocking read:')
686             print(p.readline())
687             print("\tDeleting subproc, which was set to die noisily:")
688             del p
689             print("\tDone.")
690             return None
691
692 if __name__ == '__main__':
693     test()