]> git.phdru.name Git - bookmarks_db.git/blob - subproc.py
Feat(Python3): Fix lambda
[bookmarks_db.git] / subproc.py
1 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
2
3 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
4 routines.
5
6 Subprocess class features:
7
8  - provides non-blocking stdin and stderr reads
9
10  - provides subprocess stop and continue, kill-on-deletion
11
12  - provides detection of subprocess startup failure
13
14  - Subprocess objects have nice, informative string rep (as every good object
15    ought)."""
16
17 __version__ = "Revision: 1.15 "
18
19 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp 
20 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
21
22 # Prior art: Initially based python code examples demonstrating usage of pipes
23 #            and subprocesses, primarily one by jose pereira.
24
25 # Implementation notes:
26 # - I'm not using the fcntl module to implement non-blocking file descriptors,
27 #   because i don't know what all in it is portable and what is not.  I'm not
28 #   about to provide for different platform contingencies - at that extent, the
29 #   effort would be better spent hacking 'expect' into python.
30 # - Todo? - Incorporate an error-output handler approach, where error output is
31 #           checked on regular IO, when a handler is defined, and passed to the
32 #           handler (eg for printing) immediately as it shows...
33 # - Detection of failed subprocess startup is a gross kludge, at present.
34
35 # - new additions (1.3, 1.4):
36 #  - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
37 #    reads based solely on os.read with select, while capitalizing big-time on
38 #    multi-char read chunking.
39 #  - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
40 #
41 # ken.manheimer@nist.gov
42
43
44 import sys, os, string, time, types
45 import select
46 import signal
47
48
49 SubprocessError = 'SubprocessError'
50 # You may need to increase execvp_grace_seconds, if you have a large or slow
51 # path to search:
52 execvp_grace_seconds = 0.5
53
54 class Subprocess:
55     """Run and communicate asynchronously with a subprocess.
56
57     Provides non-blocking reads in the form of .readPendingChars and
58     .readPendingLine.
59
60     .readline will block until it gets a complete line.
61
62     .peekPendingChar does a non-blocking, non-consuming read for pending
63     output, and can be used before .readline to check non-destructively for
64     pending output.  .waitForPendingChar(timeout, pollPause=.1) blocks until
65     a new character is pending, or timeout secs pass, with granularity of
66     pollPause seconds.
67
68     There are corresponding read and peekPendingErrXXX routines, to read from
69     the subprocess stderr stream."""
70
71     pid = 0
72     cmd = ''
73     expire_noisily = 1                  # Announce subproc destruction?
74     pipefiles = []
75     readbuf = 0                         # fork will assign to be a readbuf obj
76     errbuf = 0                          # fork will assign to be a readbuf obj
77
78     def __init__(self, cmd, control_stderr=0, expire_noisily=0,
79                  in_fd=0, out_fd=1, err_fd=2):
80         """Launch a subprocess, given command string COMMAND."""
81         self.cmd = cmd
82         self.pid = 0
83         self.expire_noisily = expire_noisily
84         self.control_stderr = control_stderr
85         self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
86         self.fork()
87
88     def fork(self, cmd=None):
89         """Fork a subprocess with designated COMMAND (default, self.cmd)."""
90         if cmd: self.cmd = cmd
91         else: cmd = self.cmd
92         cmd = string.split(self.cmd)
93         pRc, cWp = os.pipe()            # parent-read-child, child-write-parent
94         cRp, pWc = os.pipe()            # child-read-parent, parent-write-child
95         pRe, cWe = os.pipe()            # parent-read-error, child-write-error
96         self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
97
98         self.pid = os.fork()
99
100         if self.pid == 0:       #### CHILD ####
101             parentErr = os.dup(self.in_fd) # Preserve handle on *parent* stderr
102             # Reopen stdin, out, err, on pipe ends:
103             os.dup2(cRp, self.in_fd)            # cRp = sys.stdin
104             os.dup2(cWp, self.out_fd)           # cWp = sys.stdout
105             if self.control_stderr:
106                 os.dup2(cWe, self.err_fd)       # cWe = sys.stderr
107             # Ensure (within reason) stray file descriptors are closed:
108             excludes = [self.in_fd, self.out_fd, self.err_fd]
109             for i in range(4,100):
110                 if i not in excludes:
111                     try: os.close(i)
112                     except os.error: pass
113
114             try:
115                 os.execvp(cmd[0], cmd)
116                 os._exit(1)                     # Shouldn't get here
117
118             except os.error as e:
119                 if self.control_stderr:
120                     os.dup2(parentErr, 2)       # Reconnect to parent's stdout
121                 sys.stderr.write("**execvp failed, '%s'**\n" %
122                                  str(e))
123                 os._exit(1)
124             os._exit(1)                 # Shouldn't get here.
125
126         else:           ### PARENT ###
127             # Connect to the child's file descriptors, using our customized
128             # fdopen:
129             self.toChild = os.fdopen(pWc, 'w')
130             self.toChild_fdlist = [pWc]
131             self.readbuf = ReadBuf(pRc)
132             self.errbuf = ReadBuf(pRe)
133             time.sleep(execvp_grace_seconds)
134             try:
135                 pid, err = os.waitpid(self.pid, os.WNOHANG)
136             except os.error as error:
137                 errno, msg = error
138                 if errno == 10:
139                     raise SubprocessError("Subprocess '%s' failed." % self.cmd)
140                 raise SubprocessError("Subprocess failed[%d]: %s" % (errno, msg))
141             if pid == self.pid:
142                 # child exited already
143                 self.pid == None
144                 sig = err & 0xff
145                 rc = (err & 0xff00) >> 8
146                 if sig:
147                     raise SubprocessError(
148                     "child killed by signal %d with a return code of %d"
149                     % (sig, rc))
150                 if rc:
151                     raise SubprocessError(
152                           "child exited with return code %d" % rc)
153                 # Child may have exited, but not in error, so we won't say
154                 # anything more at this point.
155
156     ### Write input to subprocess ###
157
158     def write(self, str):
159         """Write a STRING to the subprocess."""
160
161         if not self.pid:
162             raise SubprocessError("no child")                           # ===>
163         if select.select([],self.toChild_fdlist,[],0)[1]:
164             self.toChild.write(str)
165             self.toChild.flush()
166         else:
167             # XXX Can write-buffer full be handled better??
168             raise IOError("write to %s blocked" % self)                 # ===>
169
170     def writeline(self, line=''):
171         """Write STRING, with added newline termination, to subprocess."""
172         self.write(line + '\n')
173
174     ### Get output from subprocess ###
175
176     def peekPendingChar(self):
177         """Return, but (effectively) do not consume a single pending output
178         char, or return null string if none pending."""
179
180         return self.readbuf.peekPendingChar()                           # ===>
181     def peekPendingErrChar(self):
182         """Return, but (effectively) do not consume a single pending output
183         char, or return null string if none pending."""
184
185         return self.errbuf.peekPendingChar()                            # ===>
186
187     def waitForPendingChar(self, timeout, pollPause=.1):
188         """Block max TIMEOUT secs until we peek a pending char, returning the
189         char, or '' if none encountered.
190
191         Pause POLLPAUSE secs (default .1) between polls."""
192
193         accume = 0
194         while 1:
195             nextChar = self.readbuf.peekPendingChar()
196             if nextChar or (accume > timeout): return nextChar
197             time.sleep(pollPause)
198             accume = accume + pollPause
199
200     def read(self, n=None):
201         """Read N chars, or all pending if no N specified."""
202         if n == None:
203             return self.readPendingChars()
204         got = ''
205         while n:
206             got0 = self.readPendingChars(n)
207             got = got + got0
208             n = n - len(got0)
209         return got      
210     def readPendingChars(self, max=None):
211         """Read all currently pending subprocess output as a single string."""
212         return self.readbuf.readPendingChars(max)
213     def readPendingErrChars(self):
214         """Read all currently pending subprocess error output as a single
215         string."""
216         if self.control_stderr:
217             return self.errbuf.readPendingChars()
218         else:
219             raise SubprocessError("Haven't grabbed subprocess error stream.")
220
221     def readPendingLine(self):
222         """Read currently pending subprocess output, up to a complete line
223         (newline inclusive)."""
224         return self.readbuf.readPendingLine()
225     def readPendingErrLine(self):
226         """Read currently pending subprocess error output, up to a complete
227         line (newline inclusive)."""
228         if self.control_stderr:
229             return self.errbuf.readPendingLine()
230         else:
231             raise SubprocessError("Haven't grabbed subprocess error stream.")
232
233     def readline(self):
234         """Return next complete line of subprocess output, blocking until
235         then."""
236         return self.readbuf.readline()
237     def readlineErr(self):
238         """Return next complete line of subprocess error output, blocking until
239         then."""
240         if self.control_stderr:
241             return self.errbuf.readline()
242         else:
243             raise SubprocessError("Haven't grabbed subprocess error stream.")
244
245     ### Subprocess Control ###
246
247     def active(self):
248         """True if subprocess is alive and kicking."""
249         return self.status(boolean=1)
250     def status(self, boolean=0):
251         """Return string indicating whether process is alive or dead."""
252         active = 0
253         if not self.cmd:
254             status = 'sans command'
255         elif not self.pid:
256             status = 'sans process'
257         elif not self.cont():
258             status = "(unresponding) '%s'" % self.cmd
259         else:
260             status = "'%s'" % self.cmd
261             active = 1
262         if boolean:
263             return active
264         else:
265             return status
266
267     def stop(self, verbose=1):
268         """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
269         otherwise."""
270         try:
271             os.kill(self.pid, signal.SIGSTOP)
272         except os.error:
273             if verbose:
274                 print("Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
275             return 0
276         if verbose: print("Stopped '%s'" % self.cmd)
277         return 'stopped'
278
279     def cont(self, verbose=0):
280         """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
281         otherwise."""
282         try:
283             os.kill(self.pid, signal.SIGCONT)
284         except os.error:
285             if verbose:
286                 print(("Continue failed for '%s' - '%s'" %
287                        (self.cmd, sys.exc_value)))
288             return 0
289         if verbose: print("Continued '%s'" % self.cmd)
290         return 'continued'
291
292     def die(self):
293         """Send process PID signal SIG (default 9, 'kill'), returning None once
294          it is successfully reaped.
295
296         SubprocessError is raised if process is not successfully killed."""
297
298         if not self.pid:
299             raise SubprocessError("No process")                         # ===>
300         elif not self.cont():
301             raise SubprocessError("Can't signal subproc %s" % self)     # ===>
302
303         # Try sending first a TERM and then a KILL signal.
304         keep_trying = 1
305         sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
306         for sig in sigs:
307             try:
308                 os.kill(self.pid, sig[1])
309             except posix.error:
310                 # keep trying
311                 pass
312             # Try a couple or three times to reap the process with waitpid:
313             for i in range(5):
314                 # WNOHANG == 1 on sunos, presumably same elsewhere.
315                 if os.waitpid(self.pid, os.WNOHANG):
316                     if self.expire_noisily:
317                         print(("\n(%s subproc %d '%s' / %s)" %
318                                (sig[0], self.pid, self.cmd,
319                                 hex(id(self))[2:])))
320                     for i in self.pipefiles:
321                         os.close(i)
322                     self.pid = 0
323                     return None                                         # ===>
324                 time.sleep(.1)
325         # Only got here if subprocess is not gone:
326         raise SubprocessError(
327                "Failed kill of subproc %d, '%s', with signals %s" %
328                 (self.pid, self.cmd, map(lambda x: x[0], sigs)))
329
330     def __del__(self):
331         """Terminate the subprocess"""
332         if self.pid:
333             self.die()
334
335     def __repr__(self):
336         status = self.status()
337         return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
338
339 #############################################################################
340 #####                 Non-blocking read operations                      #####
341 #############################################################################
342
343 class ReadBuf:
344     """Output buffer for non-blocking reads on selectable files like pipes and
345     sockets.  Init with a file descriptor for the file."""
346
347     def __init__(self, fd, maxChunkSize=1024):
348         """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
349         (default 1024)."""
350
351         if fd < 0:
352             raise ValueError
353         self.fd = fd
354         self.eof = 0                    # May be set with stuff still in .buf
355         self.buf = ''
356         self.chunkSize = maxChunkSize   # Biggest read chunk, default 1024.
357
358     def fileno(self):
359         return self.fd
360
361     def peekPendingChar(self):
362         """Return, but don't consume, first character of unconsumed output from
363         file, or empty string if none."""
364
365         if self.buf:
366             return self.buf[0]                                          # ===>
367
368         if self.eof:
369             return ''                                                   # ===>
370
371         sel = select.select([self.fd], [], [self.fd], 0)
372         if sel[2]:
373             self.eof = 1
374         if sel[0]:
375             self.buf = os.read(self.fd, self.chunkSize)                 # ===>
376             return self.buf[0]          # Assume select don't lie.
377         else: return ''                                                 # ===>
378
379
380     def readPendingChar(self):
381         """Consume first character of unconsumed output from file, or empty
382         string if none."""
383
384         if self.buf:
385             got, self.buf = self.buf[0], self.buf[1:]
386             return got                                                  # ===>
387
388         if self.eof:
389             return ''                                                   # ===>
390
391         sel = select.select([self.fd], [], [self.fd], 0)
392         if sel[2]:
393             self.eof = 1
394         if sel[0]:
395             return os.read(self.fd, 1)                                  # ===>
396         else: return ''                                                 # ===>
397
398     def readPendingChars(self, max=None):
399         """Consume uncomsumed output from FILE, or empty string if nothing
400         pending."""
401
402         got = ""
403         if self.buf:
404              if (max > 0) and (len(self.buf) > max):
405                  got = self.buf[0:max]
406                  self.buf = self.buf[max:]
407              else:
408                  got, self.buf = self.buf, ''
409              return got                                         
410
411         if self.eof:
412              return ''
413
414         sel = select.select([self.fd], [], [self.fd], 0)
415         if sel[2]:
416             self.eof = 1
417         if sel[0]:
418             got = got + os.read(self.fd, self.chunkSize)
419             if max == 0:
420                 self.buf = got
421                 return ''
422             elif max == None:
423                 return got
424             elif len(got) > max:
425                 self.buf = self.buf + got[max:]
426                 return got[:max]
427             else:
428                 return got
429         else: return ''
430
431     def readPendingLine(self, block=0):
432         """Return pending output from FILE, up to first newline (inclusive).
433
434         Does not block unless optional arg BLOCK is true.
435
436         Note that an error will be raised if a new eof is encountered without
437         any newline."""
438
439         if self.buf:
440             to = string.find(self.buf, '\n')
441             if to != -1:
442                 got, self.buf = self.buf[:to+1], self.buf[to+1:]
443                 return got                                              # ===>
444             got, self.buf = self.buf, ''
445         else:
446             if self.eof:
447                 return ''                                               # ===>
448             got = ''
449
450         # Herein, 'got' contains the (former) contents of the buffer, and it
451         # doesn't include a newline.
452         fdlist = [self.fd]
453         period = block and 1.0 or 0     # don't be too busy while waiting
454         while 1:                        # (we'll only loop if block set)
455             sel = select.select(fdlist, [], fdlist, period)
456             if sel[2]:
457                 self.eof = 1
458             if sel[0]:
459                 got = got + os.read(self.fd, self.chunkSize)
460
461             to = string.find(got, '\n')
462             if to != -1:
463                 got, self.buf = got[:to+1], got[to+1:]
464                 return got                                              # ===>
465             if not block:
466                 return got                                              # ===>
467             if self.eof:
468                 self.buf = ''           # this is how an ordinary file acts...
469                 return got
470             # otherwise - no newline, blocking requested, no eof - loop. # ==^
471
472     def readline(self):
473         """Return next output line from file, blocking until it is received."""
474
475         return self.readPendingLine(1)                                  # ===>
476
477
478 #############################################################################
479 #####                 Encapsulated reading and writing                  #####
480 #############################################################################
481 # Encapsulate messages so the end can be unambiguously identified, even
482 # when they contain multiple, possibly empty lines.
483
484 class RecordFile:
485     """Encapsulate stream object for record-oriented IO.
486
487     Particularly useful when dealing with non-line oriented communications
488     over pipes, eg with subprocesses."""
489
490     # Message is written preceded by a line containing the message length.
491
492     def __init__(self, f):
493         self.file = f
494
495     def write_record(self, s):
496         "Write so self.read knows exactly how much to read."
497         f = self.__dict__['file']
498         f.write("%s\n%s" % (len(s), s))
499         if hasattr(f, 'flush'):
500             f.flush()
501
502     def read_record(self):
503         "Read and reconstruct message as prepared by self.write."
504         f = self.__dict__['file']
505         line = f.readline()[:-1]
506         if line:
507             try:
508                 l = string.atoi(line)
509             except ValueError:
510                 raise IOError(("corrupt %s file structure"
511                                 % self.__class__.__name__))
512             return f.read(l)
513         else:
514             # EOF.
515             return ''
516
517     def __getattr__(self, attr):
518         """Implement characteristic file object attributes."""
519         f = self.__dict__['file']
520         if hasattr(f, attr):
521             return getattr(f, attr)
522         else:
523             raise AttributeError(attr)
524
525     def __repr__(self):
526         return "<%s of %s at %s>" % (self.__class__.__name__,
527                                      self.__dict__['file'],
528                                      hex(id(self))[2:])
529
530 def record_trial(s):
531     """Exercise encapsulated write/read with an arbitrary string.
532
533     Raise IOError if the string gets distorted through transmission!"""
534     from StringIO import StringIO
535     sf = StringIO()
536     c = RecordFile(sf)
537     c.write(s)
538     c.seek(0)
539     r = c.read()
540     show = " start:\t %s\n end:\t %s\n" % (`s`, `r`)
541     if r != s:
542         raise IOError("String distorted:\n%s" % show)
543
544 #############################################################################
545 #####                   An example subprocess interfaces                #####
546 #############################################################################
547
548 class Ph:
549     """Convenient interface to CCSO 'ph' nameserver subprocess.
550
551     .query('string...') method takes a query and returns a list of dicts, each
552     of which represents one entry."""
553
554     # Note that i made this a class that handles a subprocess object, rather
555     # than one that inherits from it.  I didn't see any functional
556     # disadvantages, and didn't think that full support of the entire
557     # Subprocess functionality was in any way suitable for interaction with
558     # this specialized interface.  ?  klm 13-Jan-1995
559
560     def __init__(self):
561         try:
562             self.proc = Subprocess('ph', 1)
563         except:
564             raise SubprocessError('failure starting ph: %s' %         # ===>
565                                     str(sys.exc_value))
566
567     def query(self, q):
568         """Send a query and return a list of dicts for responses.
569
570         Raise a ValueError if ph responds with an error."""
571
572         self.clear()
573
574         self.proc.writeline('query ' + q)
575         got = []; it = {}
576         while 1:
577             response = self.getreply()  # Should get null on new prompt.
578             errs = self.proc.readPendingErrChars()
579             if errs:
580                 sys.stderr.write(errs)
581             if it:
582                 got.append(it)
583                 it = {}
584             if not response:
585                 return got                                              # ===>
586             elif type(response) == types.StringType:
587                 raise ValueError("ph failed match: '%s'" % response)    # ===>
588             for line in response:
589                 # convert to a dict:
590                 line = string.splitfields(line, ':')
591                 it[string.strip(line[0])] = (
592                     string.strip(string.join(line[1:])))
593         
594     def getreply(self):
595         """Consume next response from ph, returning list of lines or string
596         err."""
597         # Key on first char:  (First line may lack newline.)
598         #  - dash       discard line
599         #  - 'ph> '     conclusion of response
600         #  - number     error message
601         #  - whitespace beginning of next response
602
603         nextChar = self.proc.waitForPendingChar(60)
604         if not nextChar:
605             raise SubprocessError('ph subprocess not responding')       # ===>
606         elif nextChar == '-':
607             # dashed line - discard it, and continue reading:
608             self.proc.readline()
609             return self.getreply()                                      # ===>
610         elif nextChar == 'p':
611             # 'ph> ' prompt - don't think we should hit this, but what the hay:
612             return ''                                                   # ===>
613         elif nextChar in '0123456789':
614             # Error notice - we're currently assuming single line errors:
615             return self.proc.readline()[:-1]                            # ===>
616         elif nextChar in ' \t':
617             # Get content, up to next dashed line:
618             got = []
619             while nextChar != '-' and nextChar != '':
620                 got.append(self.proc.readline()[:-1])
621                 nextChar = self.proc.peekPendingChar()
622             return got
623     def __repr__(self):
624         return "<Ph instance, %s at %s>\n" % (self.proc.status(),
625                                              hex(id(self))[2:])
626     def clear(self):
627         """Clear-out initial preface or residual subproc input and output."""
628         pause = .5; maxIter = 10        # 5 seconds to clear
629         iterations = 0
630         got = ''
631         self.proc.write('')
632         while iterations < maxIter:
633             got = got + self.proc.readPendingChars()
634             # Strip out all but the last incomplete line:
635             got = string.splitfields(got, '\n')[-1]
636             if got == 'ph> ': return    # Ok.                             ===>
637             time.sleep(pause)
638         raise SubprocessError('ph not responding within %s secs' %
639                                 pause * maxIter)
640
641 #############################################################################
642 #####                             Test                                  #####
643 #############################################################################
644
645 def test(p=0):
646     print("\tOpening subprocess:")
647     p = Subprocess('cat', 1)            # set to expire noisily...
648     print(p)
649     print("\tOpening bogus subprocess, should fail:")
650     try:
651         b = Subprocess('/', 1)
652         print("\tOops!  Null-named subprocess startup *succeeded*?!?")
653     except SubprocessError:
654         print("\t...yep, it failed.")
655     print('\tWrite, then read, two newline-teriminated lines, using readline:')
656     p.write('first full line written\n'); p.write('second.\n')
657     print(`p.readline()`)
658     print(`p.readline()`)
659     print('\tThree lines, last sans newline, read using combination:')
660     p.write('first\n'); p.write('second\n'); p.write('third, (no cr)')
661     print('\tFirst line via readline:')
662     print(`p.readline()`)
663     print('\tRest via readPendingChars:')
664     print(p.readPendingChars())
665     print("\tStopping then continuing subprocess (verbose):")
666     if not p.stop(1):                   # verbose stop
667         print('\t** Stop seems to have failed!')
668     else:
669         print('\tWriting line while subprocess is paused...')
670         p.write('written while subprocess paused\n')
671         print('\tNonblocking read of paused subprocess (should be empty):')
672         print(p.readPendingChars())
673         print('\tContinuing subprocess (verbose):')
674         if not p.cont(1):               # verbose continue
675             print('\t** Continue seems to have failed!  Probly lost subproc...')
676             return p
677         else:
678             print('\tReading accumulated line, blocking read:')
679             print(p.readline())
680             print("\tDeleting subproc, which was set to die noisily:")
681             del p
682             print("\tDone.")
683             return None