]> git.phdru.name Git - bookmarks_db.git/blob - subproc.py
Fix(storage): Adapt to the latest Mozilla format
[bookmarks_db.git] / subproc.py
1 #! /usr/bin/env python
2
3 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
4
5 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
6 routines.
7
8 Subprocess class features:
9
10  - provides non-blocking stdin and stderr reads
11
12  - provides subprocess stop and continue, kill-on-deletion
13
14  - provides detection of subprocess startup failure
15
16  - Subprocess objects have nice, informative string rep (as every good object
17    ought)."""
18
19 __version__ = "Revision: 1.15 "
20
21 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
22 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
23
24 # Prior art: Initially based python code examples demonstrating usage of pipes
25 #            and subprocesses, primarily one by jose pereira.
26
27 # Implementation notes:
28 # - I'm not using the fcntl module to implement non-blocking file descriptors,
29 #   because i don't know what all in it is portable and what is not.  I'm not
30 #   about to provide for different platform contingencies - at that extent, the
31 #   effort would be better spent hacking 'expect' into python.
32 # - Todo? - Incorporate an error-output handler approach, where error output is
33 #           checked on regular IO, when a handler is defined, and passed to the
34 #           handler (eg for printing) immediately as it shows...
35 # - Detection of failed subprocess startup is a gross kludge, at present.
36
37 # - new additions (1.3, 1.4):
38 #  - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
39 #    reads based solely on os.read with select, while capitalizing big-time on
40 #    multi-char read chunking.
41 #  - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
42 #
43 # ken.manheimer@nist.gov
44
45 # This is a modified version by Oleg Broytman <phd@phdru.name>.
46 # The original version is still preserved at
47 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
48
49 import sys, os, string, time, types
50 import select
51 import signal
52
53
54 class SubprocessError(Exception):
55     pass
56
57 # You may need to increase execvp_grace_seconds, if you have a large or slow
58 # path to search:
59 execvp_grace_seconds = 0.5
60
61 class Subprocess:
62     """Run and communicate asynchronously with a subprocess.
63
64     Provides non-blocking reads in the form of .readPendingChars and
65     .readPendingLine.
66
67     .readline will block until it gets a complete line.
68
69     .peekPendingChar does a non-blocking, non-consuming read for pending
70     output, and can be used before .readline to check non-destructively for
71     pending output.  .waitForPendingChar(timeout, pollPause=.1) blocks until
72     a new character is pending, or timeout secs pass, with granularity of
73     pollPause seconds.
74
75     There are corresponding read and peekPendingErrXXX routines, to read from
76     the subprocess stderr stream."""
77
78     pid = 0
79     cmd = ''
80     expire_noisily = 1                  # Announce subproc destruction?
81     pipefiles = []
82     readbuf = 0                         # fork will assign to be a readbuf obj
83     errbuf = 0                          # fork will assign to be a readbuf obj
84
85     def __init__(self, cmd, control_stderr=0, expire_noisily=0,
86                  in_fd=0, out_fd=1, err_fd=2):
87         """Launch a subprocess, given command string COMMAND."""
88         self.cmd = cmd
89         self.pid = 0
90         self.expire_noisily = expire_noisily
91         self.control_stderr = control_stderr
92         self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
93         self.fork()
94
95     def fork(self, cmd=None):
96         """Fork a subprocess with designated COMMAND (default, self.cmd)."""
97         if cmd: self.cmd = cmd
98         else: cmd = self.cmd
99         cmd = string.split(self.cmd)
100         pRc, cWp = os.pipe()            # parent-read-child, child-write-parent
101         cRp, pWc = os.pipe()            # child-read-parent, parent-write-child
102         pRe, cWe = os.pipe()            # parent-read-error, child-write-error
103         self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
104
105         self.pid = os.fork()
106
107         if self.pid == 0:       #### CHILD ####
108             parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
109             # Reopen stdin, out, err, on pipe ends:
110             os.dup2(cRp, self.in_fd)            # cRp = sys.stdin
111             os.dup2(cWp, self.out_fd)           # cWp = sys.stdout
112             if self.control_stderr:
113                 os.dup2(cWe, self.err_fd)       # cWe = sys.stderr
114             # Ensure (within reason) stray file descriptors are closed:
115             excludes = [self.in_fd, self.out_fd, self.err_fd]
116             for i in range(4,100):
117                 if i not in excludes:
118                     try: os.close(i)
119                     except os.error: pass
120
121             try:
122                 os.execvp(cmd[0], cmd)
123                 os._exit(1)                     # Shouldn't get here
124
125             except os.error as e:
126                 if self.control_stderr:
127                     os.dup2(parentErr, 2)       # Reconnect to parent's stdout
128                 sys.stderr.write("**execvp failed, '%s'**\n" %
129                                  str(e))
130                 os._exit(1)
131             os._exit(1)                 # Shouldn't get here.
132
133         else:           ### PARENT ###
134             # Connect to the child's file descriptors, using our customized
135             # fdopen:
136             self.toChild = os.fdopen(pWc, 'w')
137             self.toChild_fdlist = [pWc]
138             self.readbuf = ReadBuf(pRc)
139             self.errbuf = ReadBuf(pRe)
140             time.sleep(execvp_grace_seconds)
141             try:
142                 pid, err = os.waitpid(self.pid, os.WNOHANG)
143             except os.error as error:
144                 errno, msg = error
145                 if errno == 10:
146                     self.pid = None
147                     raise SubprocessError("Subprocess '%s' failed." % self.cmd)
148                 self.pid = None
149                 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
150             if pid == self.pid:
151                 # child exited already
152                 self.pid == None
153                 sig = err & 0xff
154                 rc = (err & 0xff00) >> 8
155                 if sig:
156                     raise SubprocessError(
157                     "child killed by signal %d with a return code of %d"
158                     % (sig, rc))
159                 if rc:
160                     self.pid = None
161                     raise SubprocessError(
162                           "child exited with return code %d" % rc)
163                 # Child may have exited, but not in error, so we won't say
164                 # anything more at this point.
165
166     ### Write input to subprocess ###
167
168     def write(self, str):
169         """Write a STRING to the subprocess."""
170
171         if not self.pid:
172             raise SubprocessError("no child")                           # ===>
173         if select.select([],self.toChild_fdlist,[],0)[1]:
174             self.toChild.write(str)
175             self.toChild.flush()
176         else:
177             # XXX Can write-buffer full be handled better??
178             raise IOError("write to %s blocked" % self)                 # ===>
179
180     def writeline(self, line=''):
181         """Write STRING, with added newline termination, to subprocess."""
182         self.write(line + '\n')
183
184     ### Get output from subprocess ###
185
186     def peekPendingChar(self):
187         """Return, but (effectively) do not consume a single pending output
188         char, or return null string if none pending."""
189
190         return self.readbuf.peekPendingChar()                           # ===>
191     def peekPendingErrChar(self):
192         """Return, but (effectively) do not consume a single pending output
193         char, or return null string if none pending."""
194
195         return self.errbuf.peekPendingChar()                            # ===>
196
197     def waitForPendingChar(self, timeout, pollPause=.1):
198         """Block max TIMEOUT secs until we peek a pending char, returning the
199         char, or '' if none encountered.
200
201         Pause POLLPAUSE secs (default .1) between polls."""
202
203         accume = 0
204         while 1:
205             nextChar = self.readbuf.peekPendingChar()
206             if nextChar or (accume > timeout): return nextChar
207             time.sleep(pollPause)
208             accume = accume + pollPause
209
210     def read(self, n=None):
211         """Read N chars, or all pending if no N specified."""
212         if n == None:
213             return self.readPendingChars()
214         got = ''
215         while n:
216             got0 = self.readPendingChars(n)
217             got = got + got0
218             n = n - len(got0)
219         return got
220     def readPendingChars(self, max=None):
221         """Read all currently pending subprocess output as a single string."""
222         return self.readbuf.readPendingChars(max)
223     def readPendingErrChars(self):
224         """Read all currently pending subprocess error output as a single
225         string."""
226         if self.control_stderr:
227             return self.errbuf.readPendingChars()
228         else:
229             raise SubprocessError("Haven't grabbed subprocess error stream.")
230
231     def readPendingLine(self):
232         """Read currently pending subprocess output, up to a complete line
233         (newline inclusive)."""
234         return self.readbuf.readPendingLine()
235     def readPendingErrLine(self):
236         """Read currently pending subprocess error output, up to a complete
237         line (newline inclusive)."""
238         if self.control_stderr:
239             return self.errbuf.readPendingLine()
240         else:
241             raise SubprocessError("Haven't grabbed subprocess error stream.")
242
243     def readline(self):
244         """Return next complete line of subprocess output, blocking until
245         then."""
246         return self.readbuf.readline()
247     def readlineErr(self):
248         """Return next complete line of subprocess error output, blocking until
249         then."""
250         if self.control_stderr:
251             return self.errbuf.readline()
252         else:
253             raise SubprocessError("Haven't grabbed subprocess error stream.")
254
255     ### Subprocess Control ###
256
257     def active(self):
258         """True if subprocess is alive and kicking."""
259         return self.status(boolean=1)
260     def status(self, boolean=0):
261         """Return string indicating whether process is alive or dead."""
262         active = 0
263         if not self.cmd:
264             status = 'sans command'
265         elif not self.pid:
266             status = 'sans process'
267         elif not self.cont():
268             status = "(unresponding) '%s'" % self.cmd
269         else:
270             status = "'%s'" % self.cmd
271             active = 1
272         if boolean:
273             return active
274         else:
275             return status
276
277     def stop(self, verbose=1):
278         """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
279         otherwise."""
280         try:
281             os.kill(self.pid, signal.SIGSTOP)
282         except os.error:
283             if verbose:
284                 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
285             return 0
286         if verbose: print("Stopped '%s'" % self.cmd)
287         return 'stopped'
288
289     def cont(self, verbose=0):
290         """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
291         otherwise."""
292         try:
293             os.kill(self.pid, signal.SIGCONT)
294         except os.error:
295             if verbose:
296                 print(("Continue failed for '%s' - '%s'" %
297                        (self.cmd, sys.exc_value)))
298             return 0
299         if verbose: print("Continued '%s'" % self.cmd)
300         return 'continued'
301
302     def die(self):
303         """Send process PID signal SIG (default 9, 'kill'), returning None once
304          it is successfully reaped.
305
306         SubprocessError is raised if process is not successfully killed."""
307
308         if not self.pid:
309             raise SubprocessError("No process")                         # ===>
310         elif not self.cont():
311             raise SubprocessError("Can't signal subproc %s" % self)     # ===>
312
313         # Try sending first a TERM and then a KILL signal.
314         keep_trying = 1
315         sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
316         for sig in sigs:
317             try:
318                 os.kill(self.pid, sig[1])
319             except posix.error:
320                 # keep trying
321                 pass
322             # Try a couple or three times to reap the process with waitpid:
323             for i in range(5):
324                 # WNOHANG == 1 on sunos, presumably same elsewhere.
325                 if os.waitpid(self.pid, os.WNOHANG):
326                     if self.expire_noisily:
327                         print(("\n(%s subproc %d '%s' / %s)" %
328                                (sig[0], self.pid, self.cmd,
329                                 hex(id(self))[2:])))
330                     for i in self.pipefiles:
331                         try:
332                             fp = os.fdopen(i).close()
333                         except OSError:
334                             pass
335                     del self.pipefiles[:]
336                     self.pid = 0
337                     return None                                         # ===>
338                 time.sleep(.1)
339         # Only got here if subprocess is not gone:
340         raise SubprocessError(
341                "Failed kill of subproc %d, '%s', with signals %s" %
342                 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
343
344     def __del__(self):
345         """Terminate the subprocess"""
346         if self.pid:
347             self.die()
348
349     def __repr__(self):
350         status = self.status()
351         return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
352
353 #############################################################################
354 #####                 Non-blocking read operations                      #####
355 #############################################################################
356
357 class ReadBuf:
358     """Output buffer for non-blocking reads on selectable files like pipes and
359     sockets.  Init with a file descriptor for the file."""
360
361     def __init__(self, fd, maxChunkSize=1024):
362         """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
363         (default 1024)."""
364
365         if fd < 0:
366             raise ValueError
367         self.fd = fd
368         self.eof = 0                    # May be set with stuff still in .buf
369         self.buf = ''
370         self.chunkSize = maxChunkSize   # Biggest read chunk, default 1024.
371
372     def fileno(self):
373         return self.fd
374
375     def peekPendingChar(self):
376         """Return, but don't consume, first character of unconsumed output from
377         file, or empty string if none."""
378
379         if self.buf:
380             return self.buf[0]                                          # ===>
381
382         if self.eof:
383             return ''                                                   # ===>
384
385         sel = select.select([self.fd], [], [self.fd], 0)
386         if sel[2]:
387             self.eof = 1
388         if sel[0]:
389             self.buf = os.read(self.fd, self.chunkSize)                 # ===>
390             return self.buf[0]          # Assume select don't lie.
391         else: return ''                                                 # ===>
392
393
394     def readPendingChar(self):
395         """Consume first character of unconsumed output from file, or empty
396         string if none."""
397
398         if self.buf:
399             got, self.buf = self.buf[0], self.buf[1:]
400             return got                                                  # ===>
401
402         if self.eof:
403             return ''                                                   # ===>
404
405         sel = select.select([self.fd], [], [self.fd], 0)
406         if sel[2]:
407             self.eof = 1
408         if sel[0]:
409             return os.read(self.fd, 1)                                  # ===>
410         else: return ''                                                 # ===>
411
412     def readPendingChars(self, max=None):
413         """Consume uncomsumed output from FILE, or empty string if nothing
414         pending."""
415
416         got = ""
417         if self.buf:
418             if (max > 0) and (len(self.buf) > max):
419                 got = self.buf[0:max]
420                 self.buf = self.buf[max:]
421             else:
422                 got, self.buf = self.buf, ''
423             return got
424
425         if self.eof:
426             return ''
427
428         sel = select.select([self.fd], [], [self.fd], 0)
429         if sel[2]:
430             self.eof = 1
431         if sel[0]:
432             got = got + os.read(self.fd, self.chunkSize)
433             if max == 0:
434                 self.buf = got
435                 return ''
436             elif max == None:
437                 return got
438             elif len(got) > max:
439                 self.buf = self.buf + got[max:]
440                 return got[:max]
441             else:
442                 return got
443         else: return ''
444
445     def readPendingLine(self, block=0):
446         """Return pending output from FILE, up to first newline (inclusive).
447
448         Does not block unless optional arg BLOCK is true.
449
450         Note that an error will be raised if a new eof is encountered without
451         any newline."""
452
453         if self.buf:
454             to = string.find(self.buf, '\n')
455             if to != -1:
456                 got, self.buf = self.buf[:to+1], self.buf[to+1:]
457                 return got                                              # ===>
458             got, self.buf = self.buf, ''
459         else:
460             if self.eof:
461                 return ''                                               # ===>
462             got = ''
463
464         # Herein, 'got' contains the (former) contents of the buffer, and it
465         # doesn't include a newline.
466         fdlist = [self.fd]
467         period = block and 1.0 or 0     # don't be too busy while waiting
468         while 1:                        # (we'll only loop if block set)
469             sel = select.select(fdlist, [], fdlist, period)
470             if sel[2]:
471                 self.eof = 1
472             if sel[0]:
473                 got = got + os.read(self.fd, self.chunkSize)
474
475             to = string.find(got, '\n')
476             if to != -1:
477                 got, self.buf = got[:to+1], got[to+1:]
478                 return got                                              # ===>
479             if not block:
480                 return got                                              # ===>
481             if self.eof:
482                 self.buf = ''           # this is how an ordinary file acts...
483                 return got
484             # otherwise - no newline, blocking requested, no eof - loop. # ==^
485
486     def readline(self):
487         """Return next output line from file, blocking until it is received."""
488
489         return self.readPendingLine(1)                                  # ===>
490
491
492 #############################################################################
493 #####                 Encapsulated reading and writing                  #####
494 #############################################################################
495 # Encapsulate messages so the end can be unambiguously identified, even
496 # when they contain multiple, possibly empty lines.
497
498 class RecordFile:
499     """Encapsulate stream object for record-oriented IO.
500
501     Particularly useful when dealing with non-line oriented communications
502     over pipes, eg with subprocesses."""
503
504     # Message is written preceded by a line containing the message length.
505
506     def __init__(self, f):
507         self.file = f
508
509     def write_record(self, s):
510         "Write so self.read knows exactly how much to read."
511         f = self.__dict__['file']
512         f.write("%s\n%s" % (len(s), s))
513         if hasattr(f, 'flush'):
514             f.flush()
515
516     def read_record(self):
517         "Read and reconstruct message as prepared by self.write."
518         f = self.__dict__['file']
519         line = f.readline()[:-1]
520         if line:
521             try:
522                 l = string.atoi(line)
523             except ValueError:
524                 raise IOError(("corrupt %s file structure"
525                                 % self.__class__.__name__))
526             return f.read(l)
527         else:
528             # EOF.
529             return ''
530
531     def __getattr__(self, attr):
532         """Implement characteristic file object attributes."""
533         f = self.__dict__['file']
534         if hasattr(f, attr):
535             return getattr(f, attr)
536         else:
537             raise AttributeError(attr)
538
539     def __repr__(self):
540         return "<%s of %s at %s>" % (self.__class__.__name__,
541                                      self.__dict__['file'],
542                                      hex(id(self))[2:])
543
544 def record_trial(s):
545     """Exercise encapsulated write/read with an arbitrary string.
546
547     Raise IOError if the string gets distorted through transmission!"""
548     from StringIO import StringIO
549     sf = StringIO()
550     c = RecordFile(sf)
551     c.write(s)
552     c.seek(0)
553     r = c.read()
554     show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
555     if r != s:
556         raise IOError("String distorted:\n%s" % show)
557
558 #############################################################################
559 #####                   An example subprocess interfaces                #####
560 #############################################################################
561
562 class Ph:
563     """Convenient interface to CCSO 'ph' nameserver subprocess.
564
565     .query('string...') method takes a query and returns a list of dicts, each
566     of which represents one entry."""
567
568     # Note that i made this a class that handles a subprocess object, rather
569     # than one that inherits from it.  I didn't see any functional
570     # disadvantages, and didn't think that full support of the entire
571     # Subprocess functionality was in any way suitable for interaction with
572     # this specialized interface.  ?  klm 13-Jan-1995
573
574     def __init__(self):
575         try:
576             self.proc = Subprocess('ph', 1)
577         except:
578             raise SubprocessError('failure starting ph: %s' %         # ===>
579                                     str(sys.exc_value))
580
581     def query(self, q):
582         """Send a query and return a list of dicts for responses.
583
584         Raise a ValueError if ph responds with an error."""
585
586         self.clear()
587
588         self.proc.writeline('query ' + q)
589         got = []; it = {}
590         while 1:
591             response = self.getreply()  # Should get null on new prompt.
592             errs = self.proc.readPendingErrChars()
593             if errs:
594                 sys.stderr.write(errs)
595             if it:
596                 got.append(it)
597                 it = {}
598             if not response:
599                 return got                                              # ===>
600             elif type(response) == types.StringType:
601                 raise ValueError("ph failed match: '%s'" % response)    # ===>
602             for line in response:
603                 # convert to a dict:
604                 line = string.splitfields(line, ':')
605                 it[string.strip(line[0])] = (
606                     string.strip(string.join(line[1:])))
607
608     def getreply(self):
609         """Consume next response from ph, returning list of lines or string
610         err."""
611         # Key on first char:  (First line may lack newline.)
612         #  - dash       discard line
613         #  - 'ph> '     conclusion of response
614         #  - number     error message
615         #  - whitespace beginning of next response
616
617         nextChar = self.proc.waitForPendingChar(60)
618         if not nextChar:
619             raise SubprocessError('ph subprocess not responding')       # ===>
620         elif nextChar == '-':
621             # dashed line - discard it, and continue reading:
622             self.proc.readline()
623             return self.getreply()                                      # ===>
624         elif nextChar == 'p':
625             # 'ph> ' prompt - don't think we should hit this, but what the hay:
626             return ''                                                   # ===>
627         elif nextChar in '0123456789':
628             # Error notice - we're currently assuming single line errors:
629             return self.proc.readline()[:-1]                            # ===>
630         elif nextChar in ' \t':
631             # Get content, up to next dashed line:
632             got = []
633             while nextChar != '-' and nextChar != '':
634                 got.append(self.proc.readline()[:-1])
635                 nextChar = self.proc.peekPendingChar()
636             return got
637     def __repr__(self):
638         return "<Ph instance, %s at %s>\n" % (self.proc.status(),
639                                              hex(id(self))[2:])
640     def clear(self):
641         """Clear-out initial preface or residual subproc input and output."""
642         pause = .5; maxIter = 10        # 5 seconds to clear
643         iterations = 0
644         got = ''
645         self.proc.write('')
646         while iterations < maxIter:
647             got = got + self.proc.readPendingChars()
648             # Strip out all but the last incomplete line:
649             got = string.splitfields(got, '\n')[-1]
650             if got == 'ph> ': return    # Ok.                             ===>
651             time.sleep(pause)
652         raise SubprocessError('ph not responding within %s secs' %
653                                 pause * maxIter)
654
655 #############################################################################
656 #####                             Test                                  #####
657 #############################################################################
658
659 def test(p=0):
660     print("\tOpening bogus subprocess, should fail:")
661     try:
662         b = Subprocess('/', 1)
663         print("\tOops!  Null-named subprocess startup *succeeded*?!?")
664     except SubprocessError:
665         print("\t...yep, it failed.")
666     print("\tOpening cat subprocess:")
667     p = Subprocess('cat', 1)            # set to expire noisily...
668     print(p)
669     print('\tWrite, then read, two newline-teriminated lines, using readline:')
670     p.write('first full line written\n'); p.write('second.\n')
671     print(repr(p.readline()))
672     print(repr(p.readline()))
673     print('\tThree lines, last sans newline, read using combination:')
674     p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
675     print('\tFirst line via readline:')
676     print(repr(p.readline()))
677     print('\tRest via readPendingChars:')
678     print(p.readPendingChars())
679     print("\tStopping then continuing subprocess (verbose):")
680     if not p.stop(1):                   # verbose stop
681         print('\t** Stop seems to have failed!')
682     else:
683         print('\tWriting line while subprocess is paused...')
684         p.write('written while subprocess paused\n')
685         print('\tNonblocking read of paused subprocess (should be empty):')
686         print(p.readPendingChars())
687         print('\tContinuing subprocess (verbose):')
688         if not p.cont(1):               # verbose continue
689             print('\t** Continue seems to have failed!  Probly lost subproc...')
690             return p
691         else:
692             print('\tReading accumulated line, blocking read:')
693             print(p.readline())
694             print("\tDeleting subproc, which was set to die noisily:")
695             del p
696             print("\tDone.")
697             return None
698
699 if __name__ == '__main__':
700     test()