]> git.phdru.name Git - bookmarks_db.git/blob - subproc.py
886dc4727a32f7c956ef255fbb00361916c9b996
[bookmarks_db.git] / subproc.py
1 #! /usr/bin/env python
2
3 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
4
5 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
6 routines.
7
8 Subprocess class features:
9
10  - provides non-blocking stdin and stderr reads
11
12  - provides subprocess stop and continue, kill-on-deletion
13
14  - provides detection of subprocess startup failure
15
16  - Subprocess objects have nice, informative string rep (as every good object
17    ought)."""
18
19 __version__ = "Revision: 2.0 "
20
21 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
22 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
23
24 # Prior art: Initially based python code examples demonstrating usage of pipes
25 #            and subprocesses, primarily one by jose pereira.
26
27 # Implementation notes:
28 # - I'm not using the fcntl module to implement non-blocking file descriptors,
29 #   because i don't know what all in it is portable and what is not.  I'm not
30 #   about to provide for different platform contingencies - at that extent, the
31 #   effort would be better spent hacking 'expect' into python.
32 # - Todo? - Incorporate an error-output handler approach, where error output is
33 #           checked on regular IO, when a handler is defined, and passed to the
34 #           handler (eg for printing) immediately as it shows...
35 # - Detection of failed subprocess startup is a gross kludge, at present.
36
37 # - new additions (1.3, 1.4):
38 #  - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
39 #    reads based solely on os.read with select, while capitalizing big-time on
40 #    multi-char read chunking.
41 #  - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
42 #
43 # ken.manheimer@nist.gov
44
45 # This is a modified version by Oleg Broytman <phd@phdru.name>.
46 # The original version is still preserved at
47 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
48
49 import os
50 import select
51 import signal
52 import string
53 import sys
54 import time
55
56
57 class SubprocessError(Exception):
58     pass
59
60
61 # You may need to increase execvp_grace_seconds, if you have a large or slow
62 # path to search:
63 execvp_grace_seconds = 0.5
64
65
66 class Subprocess:
67     """Run and communicate asynchronously with a subprocess.
68
69     Provides non-blocking reads in the form of .readPendingChars and
70     .readPendingLine.
71
72     .readline will block until it gets a complete line.
73
74     .peekPendingChar does a non-blocking, non-consuming read for pending
75     output, and can be used before .readline to check non-destructively for
76     pending output.  .waitForPendingChar(timeout, pollPause=.1) blocks until
77     a new character is pending, or timeout secs pass, with granularity of
78     pollPause seconds.
79
80     There are corresponding read and peekPendingErrXXX routines, to read from
81     the subprocess stderr stream."""
82
83     pid = 0
84     cmd = ''
85     expire_noisily = 1                  # Announce subproc destruction?
86     pipefiles = []
87     readbuf = 0                         # fork will assign to be a readbuf obj
88     errbuf = 0                          # fork will assign to be a readbuf obj
89
90     def __init__(self, cmd, control_stderr=0, expire_noisily=0,
91                  in_fd=0, out_fd=1, err_fd=2):
92         """Launch a subprocess, given command string COMMAND."""
93         self.cmd = cmd
94         self.pid = 0
95         self.expire_noisily = expire_noisily
96         self.control_stderr = control_stderr
97         self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
98         self.fork()
99
100     def fork(self, cmd=None):
101         """Fork a subprocess with designated COMMAND (default, self.cmd)."""
102         if cmd:
103             self.cmd = cmd
104         cmd = string.split(self.cmd)
105         pRc, cWp = os.pipe()            # parent-read-child, child-write-parent
106         cRp, pWc = os.pipe()            # child-read-parent, parent-write-child
107         pRe, cWe = os.pipe()            # parent-read-error, child-write-error
108         self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
109
110         self.pid = os.fork()
111
112         if self.pid == 0:       #### CHILD #### noqa: E262
113             # Preserve handle on *parent* stderr
114             parentErr = os.dup(self.in_fd)
115             # Reopen stdin, out, err, on pipe ends:
116             os.dup2(cRp, self.in_fd)            # cRp = sys.stdin
117             os.dup2(cWp, self.out_fd)           # cWp = sys.stdout
118             if self.control_stderr:
119                 os.dup2(cWe, self.err_fd)       # cWe = sys.stderr
120             # Ensure (within reason) stray file descriptors are closed:
121             excludes = [self.in_fd, self.out_fd, self.err_fd]
122             for i in range(4, 100):
123                 if i not in excludes:
124                     try:
125                         os.close(i)
126                     except os.error:
127                         pass
128
129             try:
130                 os.execvp(cmd[0], cmd)
131                 os._exit(1)                     # Shouldn't get here
132
133             except os.error as e:
134                 if self.control_stderr:
135                     os.dup2(parentErr, 2)       # Reconnect to parent's stdout
136                 sys.stderr.write("**execvp failed, '%s'**\n" %
137                                  str(e))
138                 os._exit(1)
139             os._exit(1)                 # Shouldn't get here.
140
141         else:           ### PARENT ### noqa: E262
142             # Connect to the child's file descriptors, using our customized
143             # fdopen:
144             self.toChild = os.fdopen(pWc, 'w')
145             self.toChild_fdlist = [pWc]
146             self.readbuf = ReadBuf(pRc)
147             self.errbuf = ReadBuf(pRe)
148             time.sleep(execvp_grace_seconds)
149             try:
150                 pid, err = os.waitpid(self.pid, os.WNOHANG)
151             except os.error as error:
152                 errno, msg = error
153                 if errno == 10:
154                     self.pid = None
155                     raise SubprocessError("Subprocess '%s' failed." % self.cmd)
156                 self.pid = None
157                 raise SubprocessError(
158                     "Subprocess failed[%d]: %s" % (errno, msg))
159             if pid == self.pid:
160                 # child exited already
161                 self.pid = None
162                 sig = err & 0xff
163                 rc = (err & 0xff00) >> 8
164                 if sig:
165                     raise SubprocessError(
166                         "child killed by signal %d with a return code of %d"
167                         % (sig, rc))
168                 if rc:
169                     raise SubprocessError(
170                           "child exited with return code %d" % rc)
171                 # Child may have exited, but not in error, so we won't say
172                 # anything more at this point.
173
174     ### Write input to subprocess ### noqa: E266
175
176     def write(self, str):
177         """Write a STRING to the subprocess."""
178
179         if not self.pid:
180             raise SubprocessError("no child")                           # ===>
181         if select.select([], self.toChild_fdlist, [], 0)[1]:
182             self.toChild.write(str)
183             self.toChild.flush()
184         else:
185             # XXX Can write-buffer full be handled better??
186             raise IOError("write to %s blocked" % self)                 # ===>
187
188     def writeline(self, line=''):
189         """Write STRING, with added newline termination, to subprocess."""
190         self.write(line + '\n')
191
192     ### Get output from subprocess ### noqa: E266
193
194     def peekPendingChar(self):
195         """Return, but (effectively) do not consume a single pending output
196         char, or return null string if none pending."""
197
198         return self.readbuf.peekPendingChar()                           # ===>
199
200     def peekPendingErrChar(self):
201         """Return, but (effectively) do not consume a single pending output
202         char, or return null string if none pending."""
203
204         return self.errbuf.peekPendingChar()                            # ===>
205
206     def waitForPendingChar(self, timeout, pollPause=.1):
207         """Block max TIMEOUT secs until we peek a pending char, returning the
208         char, or '' if none encountered.
209
210         Pause POLLPAUSE secs (default .1) between polls."""
211
212         accume = 0
213         while 1:
214             nextChar = self.readbuf.peekPendingChar()
215             if nextChar or (accume > timeout):
216                 return nextChar
217             time.sleep(pollPause)
218             accume = accume + pollPause
219
220     def read(self, n=None):
221         """Read N chars, or all pending if no N specified."""
222         if n is None:
223             return self.readPendingChars()
224         got = ''
225         while n:
226             got0 = self.readPendingChars(n)
227             got = got + got0
228             n = n - len(got0)
229         return got
230
231     def readPendingChars(self, max=None):
232         """Read all currently pending subprocess output as a single string."""
233         return self.readbuf.readPendingChars(max)
234
235     def readPendingErrChars(self):
236         """Read all currently pending subprocess error output as a single
237         string."""
238         if self.control_stderr:
239             return self.errbuf.readPendingChars()
240         else:
241             raise SubprocessError("Haven't grabbed subprocess error stream.")
242
243     def readPendingLine(self):
244         """Read currently pending subprocess output, up to a complete line
245         (newline inclusive)."""
246         return self.readbuf.readPendingLine()
247
248     def readPendingErrLine(self):
249         """Read currently pending subprocess error output, up to a complete
250         line (newline inclusive)."""
251         if self.control_stderr:
252             return self.errbuf.readPendingLine()
253         else:
254             raise SubprocessError("Haven't grabbed subprocess error stream.")
255
256     def readline(self):
257         """Return next complete line of subprocess output, blocking until
258         then."""
259         return self.readbuf.readline()
260
261     def readlineErr(self):
262         """Return next complete line of subprocess error output, blocking until
263         then."""
264         if self.control_stderr:
265             return self.errbuf.readline()
266         else:
267             raise SubprocessError("Haven't grabbed subprocess error stream.")
268
269     ### Subprocess Control ### noqa: E266
270
271     def active(self):
272         """True if subprocess is alive and kicking."""
273         return self.status(boolean=1)
274
275     def status(self, boolean=0):
276         """Return string indicating whether process is alive or dead."""
277         active = 0
278         if not self.cmd:
279             status = 'sans command'
280         elif not self.pid:
281             status = 'sans process'
282         elif not self.cont():
283             status = "(unresponding) '%s'" % self.cmd
284         else:
285             status = "'%s'" % self.cmd
286             active = 1
287         if boolean:
288             return active
289         else:
290             return status
291
292     def stop(self, verbose=1):
293         """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
294         otherwise."""
295         try:
296             os.kill(self.pid, signal.SIGSTOP)
297         except os.error:
298             if verbose:
299                 print(
300                     "Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
301             return 0
302         if verbose:
303             print("Stopped '%s'" % self.cmd)
304         return 'stopped'
305
306     def cont(self, verbose=0):
307         """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
308         otherwise."""
309         try:
310             os.kill(self.pid, signal.SIGCONT)
311         except os.error:
312             if verbose:
313                 print(("Continue failed for '%s' - '%s'" %
314                        (self.cmd, sys.exc_value)))
315             return 0
316         if verbose:
317             print("Continued '%s'" % self.cmd)
318         return 'continued'
319
320     def die(self):
321         """Send process PID signal SIG (default 9, 'kill'), returning None once
322          it is successfully reaped.
323
324         SubprocessError is raised if process is not successfully killed."""
325
326         if not self.pid:
327             raise SubprocessError("No process")                         # ===>
328         elif not self.cont():
329             raise SubprocessError("Can't signal subproc %s" % self)     # ===>
330
331         # Try sending first a TERM and then a KILL signal.
332         sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
333         for sig in sigs:
334             try:
335                 os.kill(self.pid, sig[1])
336             except OSError:
337                 # keep trying
338                 pass
339             # Try a couple or three times to reap the process with waitpid:
340             for i in range(5):
341                 # WNOHANG == 1 on sunos, presumably same elsewhere.
342                 if os.waitpid(self.pid, os.WNOHANG):
343                     if self.expire_noisily:
344                         print(("\n(%s subproc %d '%s' / %s)" %
345                                (sig[0], self.pid, self.cmd,
346                                 hex(id(self))[2:])))
347                     for i in self.pipefiles:
348                         try:
349                             os.fdopen(i).close()
350                         except OSError:
351                             pass
352                     del self.pipefiles[:]
353                     self.pid = 0
354                     return None                                         # ===>
355                 time.sleep(.1)
356         # Only got here if subprocess is not gone:
357         raise SubprocessError(
358                "Failed kill of subproc %d, '%s', with signals %s" %
359                (self.pid, self.cmd, map(lambda x: x[0], sigs)))
360
361     def __del__(self):
362         """Terminate the subprocess"""
363         if self.pid:
364             self.die()
365
366     def __repr__(self):
367         status = self.status()
368         return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
369
370
371 ##############################################################################
372 #####              Non-blocking read operations               ##### noqa: E266
373 ##############################################################################
374
375
376 class ReadBuf:
377     """Output buffer for non-blocking reads on selectable files like pipes and
378     sockets.  Init with a file descriptor for the file."""
379
380     def __init__(self, fd, maxChunkSize=1024):
381         """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
382         (default 1024)."""
383
384         if fd < 0:
385             raise ValueError
386         self.fd = fd
387         self.eof = 0                    # May be set with stuff still in .buf
388         self.buf = ''
389         self.chunkSize = maxChunkSize   # Biggest read chunk, default 1024.
390
391     def fileno(self):
392         return self.fd
393
394     def peekPendingChar(self):
395         """Return, but don't consume, first character of unconsumed output from
396         file, or empty string if none."""
397
398         if self.buf:
399             return self.buf[0]                                          # ===>
400
401         if self.eof:
402             return ''                                                   # ===>
403
404         sel = select.select([self.fd], [], [self.fd], 0)
405         if sel[2]:
406             self.eof = 1
407         if sel[0]:
408             self.buf = os.read(self.fd, self.chunkSize)                 # ===>
409             return self.buf[0]          # Assume select don't lie.
410         else:
411             return ''                                                 # ===>
412
413     def readPendingChar(self):
414         """Consume first character of unconsumed output from file, or empty
415         string if none."""
416
417         if self.buf:
418             got, self.buf = self.buf[0], self.buf[1:]
419             return got                                                  # ===>
420
421         if self.eof:
422             return ''                                                   # ===>
423
424         sel = select.select([self.fd], [], [self.fd], 0)
425         if sel[2]:
426             self.eof = 1
427         if sel[0]:
428             return os.read(self.fd, 1)                                  # ===>
429         else:
430             return ''                                                 # ===>
431
432     def readPendingChars(self, max=None):
433         """Consume uncomsumed output from FILE, or empty string if nothing
434         pending."""
435
436         got = ""
437         if self.buf:
438             if (max > 0) and (len(self.buf) > max):
439                 got = self.buf[0:max]
440                 self.buf = self.buf[max:]
441             else:
442                 got, self.buf = self.buf, ''
443             return got
444
445         if self.eof:
446             return ''
447
448         sel = select.select([self.fd], [], [self.fd], 0)
449         if sel[2]:
450             self.eof = 1
451         if sel[0]:
452             got = got + os.read(self.fd, self.chunkSize)
453             if max == 0:
454                 self.buf = got
455                 return ''
456             elif max is None:
457                 return got
458             elif len(got) > max:
459                 self.buf = self.buf + got[max:]
460                 return got[:max]
461             else:
462                 return got
463         else:
464             return ''
465
466     def readPendingLine(self, block=0):
467         """Return pending output from FILE, up to first newline (inclusive).
468
469         Does not block unless optional arg BLOCK is true.
470
471         Note that an error will be raised if a new eof is encountered without
472         any newline."""
473
474         if self.buf:
475             to = string.find(self.buf, '\n')
476             if to != -1:
477                 got, self.buf = self.buf[:to+1], self.buf[to+1:]
478                 return got                                              # ===>
479             got, self.buf = self.buf, ''
480         else:
481             if self.eof:
482                 return ''                                               # ===>
483             got = ''
484
485         # Herein, 'got' contains the (former) contents of the buffer, and it
486         # doesn't include a newline.
487         fdlist = [self.fd]
488         period = block and 1.0 or 0     # don't be too busy while waiting
489         while 1:                        # (we'll only loop if block set)
490             sel = select.select(fdlist, [], fdlist, period)
491             if sel[2]:
492                 self.eof = 1
493             if sel[0]:
494                 got = got + os.read(self.fd, self.chunkSize)
495
496             to = string.find(got, '\n')
497             if to != -1:
498                 got, self.buf = got[:to+1], got[to+1:]
499                 return got                                              # ===>
500             if not block:
501                 return got                                              # ===>
502             if self.eof:
503                 self.buf = ''           # this is how an ordinary file acts...
504                 return got
505             # otherwise - no newline, blocking requested, no eof - loop. # ==^
506
507     def readline(self):
508         """Return next output line from file, blocking until it is received."""
509
510         return self.readPendingLine(1)                                  # ===>
511
512
513 #############################################################################
514 #####            Encapsulated reading and writing            ##### noqa: E266
515 #############################################################################
516 # Encapsulate messages so the end can be unambiguously identified, even
517 # when they contain multiple, possibly empty lines.
518
519
520 class RecordFile:
521     """Encapsulate stream object for record-oriented IO.
522
523     Particularly useful when dealing with non-line oriented communications
524     over pipes, eg with subprocesses."""
525
526     # Message is written preceded by a line containing the message length.
527
528     def __init__(self, f):
529         self.file = f
530
531     def write_record(self, s):
532         "Write so self.read knows exactly how much to read."
533         f = self.__dict__['file']
534         f.write("%s\n%s" % (len(s), s))
535         if hasattr(f, 'flush'):
536             f.flush()
537
538     def read_record(self):
539         "Read and reconstruct message as prepared by self.write."
540         f = self.__dict__['file']
541         line = f.readline()[:-1]
542         if line:
543             try:
544                 l = string.atoi(line)
545             except ValueError:
546                 raise IOError(("corrupt %s file structure"
547                                % self.__class__.__name__))
548             return f.read(l)
549         else:
550             # EOF.
551             return ''
552
553     def __getattr__(self, attr):
554         """Implement characteristic file object attributes."""
555         f = self.__dict__['file']
556         if hasattr(f, attr):
557             return getattr(f, attr)
558         else:
559             raise AttributeError(attr)
560
561     def __repr__(self):
562         return "<%s of %s at %s>" % (self.__class__.__name__,
563                                      self.__dict__['file'],
564                                      hex(id(self))[2:])
565
566
567 def record_trial(s):
568     """Exercise encapsulated write/read with an arbitrary string.
569
570     Raise IOError if the string gets distorted through transmission!"""
571     from StringIO import StringIO
572     sf = StringIO()
573     c = RecordFile(sf)
574     c.write(s)
575     c.seek(0)
576     r = c.read()
577     show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
578     if r != s:
579         raise IOError("String distorted:\n%s" % show)
580
581
582 #############################################################################
583 #####            An example subprocess interfaces            ##### noqa: E266
584 #############################################################################
585
586
587 class Ph:
588     """Convenient interface to CCSO 'ph' nameserver subprocess.
589
590     .query('string...') method takes a query and returns a list of dicts, each
591     of which represents one entry."""
592
593     # Note that i made this a class that handles a subprocess object, rather
594     # than one that inherits from it.  I didn't see any functional
595     # disadvantages, and didn't think that full support of the entire
596     # Subprocess functionality was in any way suitable for interaction with
597     # this specialized interface.  ?  klm 13-Jan-1995
598
599     def __init__(self):
600         try:
601             self.proc = Subprocess('ph', 1)
602         except:
603             raise SubprocessError('failure starting ph: %s' %         # ===>
604                                   str(sys.exc_value))
605
606     def query(self, q):
607         """Send a query and return a list of dicts for responses.
608
609         Raise a ValueError if ph responds with an error."""
610
611         self.clear()
612
613         self.proc.writeline('query ' + q)
614         got = []
615         it = {}
616         while 1:
617             response = self.getreply()  # Should get null on new prompt.
618             errs = self.proc.readPendingErrChars()
619             if errs:
620                 sys.stderr.write(errs)
621             if it:
622                 got.append(it)
623                 it = {}
624             if not response:
625                 return got                                              # ===>
626             elif isinstance(response, str):
627                 raise ValueError("ph failed match: '%s'" % response)    # ===>
628             for line in response:
629                 # convert to a dict:
630                 line = string.splitfields(line, ':')
631                 it[string.strip(line[0])] = (
632                     string.strip(string.join(line[1:])))
633
634     def getreply(self):
635         """Consume next response from ph, returning list of lines or string
636         err."""
637         # Key on first char:  (First line may lack newline.)
638         #  - dash       discard line
639         #  - 'ph> '     conclusion of response
640         #  - number     error message
641         #  - whitespace beginning of next response
642
643         nextChar = self.proc.waitForPendingChar(60)
644         if not nextChar:
645             raise SubprocessError('ph subprocess not responding')       # ===>
646         elif nextChar == '-':
647             # dashed line - discard it, and continue reading:
648             self.proc.readline()
649             return self.getreply()                                      # ===>
650         elif nextChar == 'p':
651             # 'ph> ' prompt - don't think we should hit this, but what the hay:
652             return ''                                                   # ===>
653         elif nextChar in '0123456789':
654             # Error notice - we're currently assuming single line errors:
655             return self.proc.readline()[:-1]                            # ===>
656         elif nextChar in ' \t':
657             # Get content, up to next dashed line:
658             got = []
659             while nextChar != '-' and nextChar != '':
660                 got.append(self.proc.readline()[:-1])
661                 nextChar = self.proc.peekPendingChar()
662             return got
663
664     def __repr__(self):
665         return "<Ph instance, %s at %s>\n" % (self.proc.status(),
666                                               hex(id(self))[2:])
667
668     def clear(self):
669         """Clear-out initial preface or residual subproc input and output."""
670         pause = .5
671         maxIter = 10        # 5 seconds to clear
672         iterations = 0
673         got = ''
674         self.proc.write('')
675         while iterations < maxIter:
676             got = got + self.proc.readPendingChars()
677             # Strip out all but the last incomplete line:
678             got = string.splitfields(got, '\n')[-1]
679             if got == 'ph> ':
680                 return    # Ok.                             ===>
681             time.sleep(pause)
682         raise SubprocessError('ph not responding within %s secs' %
683                               pause * maxIter)
684
685
686 ##############################################################################
687 #####                          Test                           ##### noqa: E266
688 ##############################################################################
689
690
691 def test(p=0):
692     print("\tOpening bogus subprocess, should fail:")
693     try:
694         Subprocess('/', 1)
695         print("\tOops!  Null-named subprocess startup *succeeded*?!?")
696     except SubprocessError:
697         print("\t...yep, it failed.")
698     print("\tOpening cat subprocess:")
699     p = Subprocess('cat', 1)            # set to expire noisily...
700     print(p)
701     print('\tWrite, then read, two newline-teriminated lines, using readline:')
702     p.write('first full line written\n')
703     p.write('second.\n')
704     print(repr(p.readline()))
705     print(repr(p.readline()))
706     print('\tThree lines, last sans newline, read using combination:')
707     p.write('first\n')
708     p.write('second\n')
709     p.write('third, (no cr)')
710     print('\tFirst line via readline:')
711     print(repr(p.readline()))
712     print('\tRest via readPendingChars:')
713     print(p.readPendingChars())
714     print("\tStopping then continuing subprocess (verbose):")
715     if not p.stop(1):                   # verbose stop
716         print('\t** Stop seems to have failed!')
717     else:
718         print('\tWriting line while subprocess is paused...')
719         p.write('written while subprocess paused\n')
720         print('\tNonblocking read of paused subprocess (should be empty):')
721         print(p.readPendingChars())
722         print('\tContinuing subprocess (verbose):')
723         if not p.cont(1):               # verbose continue
724             print(
725                 '\t** Continue seems to have failed!  Probly lost subproc...')
726             return p
727         else:
728             print('\tReading accumulated line, blocking read:')
729             print(p.readline())
730             print("\tDeleting subproc, which was set to die noisily:")
731             del p
732             print("\tDone.")
733             return None
734
735
736 if __name__ == '__main__':
737     test()