]> git.phdru.name Git - bookmarks_db.git/blob - subproc.py
Fix(Robot): Stop splitting and un-splitting URLs
[bookmarks_db.git] / subproc.py
1 #! /usr/bin/env python
2
3 """Run a subprocess and communicate with it via stdin, stdout, and stderr.
4
5 Requires that platform supports, eg, posix-style 'os.pipe' and 'os.fork'
6 routines.
7
8 Subprocess class features:
9
10  - provides non-blocking stdin and stderr reads
11
12  - provides subprocess stop and continue, kill-on-deletion
13
14  - provides detection of subprocess startup failure
15
16  - Subprocess objects have nice, informative string rep (as every good object
17    ought)."""
18
19 __version__ = "Revision: 2.0 "
20
21 # Id: subproc.py,v 1.15 1998/12/14 20:53:16 klm Exp
22 # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995.
23
24 # Prior art: Initially based python code examples demonstrating usage of pipes
25 #            and subprocesses, primarily one by jose pereira.
26
27 # Implementation notes:
28 # - I'm not using the fcntl module to implement non-blocking file descriptors,
29 #   because i don't know what all in it is portable and what is not.  I'm not
30 #   about to provide for different platform contingencies - at that extent, the
31 #   effort would be better spent hacking 'expect' into python.
32 # - Todo? - Incorporate an error-output handler approach, where error output is
33 #           checked on regular IO, when a handler is defined, and passed to the
34 #           handler (eg for printing) immediately as it shows...
35 # - Detection of failed subprocess startup is a gross kludge, at present.
36
37 # - new additions (1.3, 1.4):
38 #  - Readbuf, taken from donn cave's iobuf addition, implements non-blocking
39 #    reads based solely on os.read with select, while capitalizing big-time on
40 #    multi-char read chunking.
41 #  - Subproc deletion frees up pipe file descriptors, so they're not exhausted.
42 #
43 # ken.manheimer@nist.gov
44
45 # This is a modified version by Oleg Broytman <phd@phdru.name>.
46 # The original version is still preserved at
47 # https://www.python.org/ftp/python/contrib-09-Dec-1999/System/subproc.tar.gz
48
49 import os
50 import select
51 import signal
52 import sys
53 import time
54
55
56 class SubprocessError(Exception):
57     pass
58
59
60 # You may need to increase execvp_grace_seconds, if you have a large or slow
61 # path to search:
62 execvp_grace_seconds = 0.5
63
64
65 class Subprocess:
66     """Run and communicate asynchronously with a subprocess.
67
68     Provides non-blocking reads in the form of .readPendingChars and
69     .readPendingLine.
70
71     .readline will block until it gets a complete line.
72
73     .peekPendingChar does a non-blocking, non-consuming read for pending
74     output, and can be used before .readline to check non-destructively for
75     pending output.  .waitForPendingChar(timeout, pollPause=.1) blocks until
76     a new character is pending, or timeout secs pass, with granularity of
77     pollPause seconds.
78
79     There are corresponding read and peekPendingErrXXX routines, to read from
80     the subprocess stderr stream."""
81
82     pid = 0
83     cmd = ''
84     expire_noisily = 1                  # Announce subproc destruction?
85     pipefiles = []
86     readbuf = 0                         # fork will assign to be a readbuf obj
87     errbuf = 0                          # fork will assign to be a readbuf obj
88
89     def __init__(self, cmd, control_stderr=0, expire_noisily=0,
90                  in_fd=0, out_fd=1, err_fd=2):
91         """Launch a subprocess, given command string COMMAND."""
92         self.cmd = cmd
93         self.pid = 0
94         self.expire_noisily = expire_noisily
95         self.control_stderr = control_stderr
96         self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd
97         self.fork()
98
99     def fork(self, cmd=None):
100         """Fork a subprocess with designated COMMAND (default, self.cmd)."""
101         if cmd:
102             self.cmd = cmd
103         cmd = self.cmd.split()
104         pRc, cWp = os.pipe()            # parent-read-child, child-write-parent
105         cRp, pWc = os.pipe()            # child-read-parent, parent-write-child
106         pRe, cWe = os.pipe()            # parent-read-error, child-write-error
107         self.pipefiles = [pRc, cWp, cRp, pWc, pRe, cWe]
108
109         self.pid = os.fork()
110
111         if self.pid == 0:       #### CHILD #### noqa: E262
112             # Preserve handle on *parent* stderr
113             parentErr = os.dup(self.in_fd)
114             # Reopen stdin, out, err, on pipe ends:
115             os.dup2(cRp, self.in_fd)            # cRp = sys.stdin
116             os.dup2(cWp, self.out_fd)           # cWp = sys.stdout
117             if self.control_stderr:
118                 os.dup2(cWe, self.err_fd)       # cWe = sys.stderr
119             # Ensure (within reason) stray file descriptors are closed:
120             excludes = [self.in_fd, self.out_fd, self.err_fd]
121             for i in range(4, 100):
122                 if i not in excludes:
123                     try:
124                         os.close(i)
125                     except os.error:
126                         pass
127
128             try:
129                 os.execvp(cmd[0], cmd)
130                 os._exit(1)                     # Shouldn't get here
131
132             except os.error as e:
133                 if self.control_stderr:
134                     os.dup2(parentErr, 2)       # Reconnect to parent's stdout
135                 sys.stderr.write("**execvp failed, '%s'**\n" %
136                                  str(e))
137                 os._exit(1)
138             os._exit(1)                 # Shouldn't get here.
139
140         else:           ### PARENT ### noqa: E262
141             # Connect to the child's file descriptors, using our customized
142             # fdopen:
143             self.toChild = os.fdopen(pWc, 'w')
144             self.toChild_fdlist = [pWc]
145             self.readbuf = ReadBuf(pRc)
146             self.errbuf = ReadBuf(pRe)
147             time.sleep(execvp_grace_seconds)
148             try:
149                 pid, err = os.waitpid(self.pid, os.WNOHANG)
150             except os.error as error:
151                 errno, msg = error
152                 if errno == 10:
153                     self.pid = None
154                     raise SubprocessError("Subprocess '%s' failed." % self.cmd)
155                 self.pid = None
156                 raise SubprocessError(
157                     "Subprocess failed[%d]: %s" % (errno, msg))
158             if pid == self.pid:
159                 # child exited already
160                 self.pid = None
161                 sig = err & 0xff
162                 rc = (err & 0xff00) >> 8
163                 if sig:
164                     raise SubprocessError(
165                         "child killed by signal %d with a return code of %d"
166                         % (sig, rc))
167                 if rc:
168                     raise SubprocessError(
169                           "child exited with return code %d" % rc)
170                 # Child may have exited, but not in error, so we won't say
171                 # anything more at this point.
172
173     ### Write input to subprocess ### noqa: E266
174
175     def write(self, str):
176         """Write a STRING to the subprocess."""
177
178         if not self.pid:
179             raise SubprocessError("no child")                           # ===>
180         if select.select([], self.toChild_fdlist, [], 0)[1]:
181             self.toChild.write(str)
182             self.toChild.flush()
183         else:
184             # XXX Can write-buffer full be handled better??
185             raise IOError("write to %s blocked" % self)                 # ===>
186
187     def writeline(self, line=''):
188         """Write STRING, with added newline termination, to subprocess."""
189         self.write(line + '\n')
190
191     ### Get output from subprocess ### noqa: E266
192
193     def peekPendingChar(self):
194         """Return, but (effectively) do not consume a single pending output
195         char, or return null string if none pending."""
196
197         return self.readbuf.peekPendingChar()                           # ===>
198
199     def peekPendingErrChar(self):
200         """Return, but (effectively) do not consume a single pending output
201         char, or return null string if none pending."""
202
203         return self.errbuf.peekPendingChar()                            # ===>
204
205     def waitForPendingChar(self, timeout, pollPause=.1):
206         """Block max TIMEOUT secs until we peek a pending char, returning the
207         char, or '' if none encountered.
208
209         Pause POLLPAUSE secs (default .1) between polls."""
210
211         accume = 0
212         while 1:
213             nextChar = self.readbuf.peekPendingChar()
214             if nextChar or (accume > timeout):
215                 return nextChar
216             time.sleep(pollPause)
217             accume = accume + pollPause
218
219     def read(self, n=None):
220         """Read N chars, or all pending if no N specified."""
221         if n is None:
222             return self.readPendingChars()
223         got = ''
224         while n:
225             got0 = self.readPendingChars(n)
226             got = got + got0
227             n = n - len(got0)
228         return got
229
230     def readPendingChars(self, max=None):
231         """Read all currently pending subprocess output as a single string."""
232         return self.readbuf.readPendingChars(max)
233
234     def readPendingErrChars(self):
235         """Read all currently pending subprocess error output as a single
236         string."""
237         if self.control_stderr:
238             return self.errbuf.readPendingChars()
239         else:
240             raise SubprocessError("Haven't grabbed subprocess error stream.")
241
242     def readPendingLine(self):
243         """Read currently pending subprocess output, up to a complete line
244         (newline inclusive)."""
245         return self.readbuf.readPendingLine()
246
247     def readPendingErrLine(self):
248         """Read currently pending subprocess error output, up to a complete
249         line (newline inclusive)."""
250         if self.control_stderr:
251             return self.errbuf.readPendingLine()
252         else:
253             raise SubprocessError("Haven't grabbed subprocess error stream.")
254
255     def readline(self):
256         """Return next complete line of subprocess output, blocking until
257         then."""
258         return self.readbuf.readline()
259
260     def readlineErr(self):
261         """Return next complete line of subprocess error output, blocking until
262         then."""
263         if self.control_stderr:
264             return self.errbuf.readline()
265         else:
266             raise SubprocessError("Haven't grabbed subprocess error stream.")
267
268     ### Subprocess Control ### noqa: E266
269
270     def active(self):
271         """True if subprocess is alive and kicking."""
272         return self.status(boolean=1)
273
274     def status(self, boolean=0):
275         """Return string indicating whether process is alive or dead."""
276         active = 0
277         if not self.cmd:
278             status = 'sans command'
279         elif not self.pid:
280             status = 'sans process'
281         elif not self.cont():
282             status = "(unresponding) '%s'" % self.cmd
283         else:
284             status = "'%s'" % self.cmd
285             active = 1
286         if boolean:
287             return active
288         else:
289             return status
290
291     def stop(self, verbose=1):
292         """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0
293         otherwise."""
294         try:
295             os.kill(self.pid, signal.SIGSTOP)
296         except os.error:
297             if verbose:
298                 print(
299                     "Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value))
300             return 0
301         if verbose:
302             print("Stopped '%s'" % self.cmd)
303         return 'stopped'
304
305     def cont(self, verbose=0):
306         """Signal subprocess with CONT (19), returning 'continued' if ok, or 0
307         otherwise."""
308         try:
309             os.kill(self.pid, signal.SIGCONT)
310         except os.error:
311             if verbose:
312                 print(("Continue failed for '%s' - '%s'" %
313                        (self.cmd, sys.exc_value)))
314             return 0
315         if verbose:
316             print("Continued '%s'" % self.cmd)
317         return 'continued'
318
319     def die(self):
320         """Send process PID signal SIG (default 9, 'kill'), returning None once
321          it is successfully reaped.
322
323         SubprocessError is raised if process is not successfully killed."""
324
325         if not self.pid:
326             raise SubprocessError("No process")                         # ===>
327         elif not self.cont():
328             raise SubprocessError("Can't signal subproc %s" % self)     # ===>
329
330         # Try sending first a TERM and then a KILL signal.
331         sigs = [('TERMinated', signal.SIGTERM), ('KILLed', signal.SIGKILL)]
332         for sig in sigs:
333             try:
334                 os.kill(self.pid, sig[1])
335             except OSError:
336                 # keep trying
337                 pass
338             # Try a couple or three times to reap the process with waitpid:
339             for i in range(5):
340                 # WNOHANG == 1 on sunos, presumably same elsewhere.
341                 if os.waitpid(self.pid, os.WNOHANG):
342                     if self.expire_noisily:
343                         print(("\n(%s subproc %d '%s' / %s)" %
344                                (sig[0], self.pid, self.cmd,
345                                 hex(id(self))[2:])))
346                     for i in self.pipefiles:
347                         try:
348                             os.fdopen(i).close()
349                         except OSError:
350                             pass
351                     del self.pipefiles[:]
352                     self.pid = 0
353                     return None                                         # ===>
354                 time.sleep(.1)
355         # Only got here if subprocess is not gone:
356         raise SubprocessError(
357                "Failed kill of subproc %d, '%s', with signals %s" %
358                (self.pid, self.cmd, map(lambda x: x[0], sigs)))
359
360     def __del__(self):
361         """Terminate the subprocess"""
362         if self.pid:
363             self.die()
364
365     def __repr__(self):
366         status = self.status()
367         return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
368
369
370 ##############################################################################
371 #####              Non-blocking read operations               ##### noqa: E266
372 ##############################################################################
373
374
375 class ReadBuf:
376     """Output buffer for non-blocking reads on selectable files like pipes and
377     sockets.  Init with a file descriptor for the file."""
378
379     def __init__(self, fd, maxChunkSize=1024):
380         """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE
381         (default 1024)."""
382
383         if fd < 0:
384             raise ValueError
385         self.fd = fd
386         self.eof = 0                    # May be set with stuff still in .buf
387         self.buf = ''
388         self.chunkSize = maxChunkSize   # Biggest read chunk, default 1024.
389
390     def fileno(self):
391         return self.fd
392
393     def peekPendingChar(self):
394         """Return, but don't consume, first character of unconsumed output from
395         file, or empty string if none."""
396
397         if self.buf:
398             return self.buf[0]                                          # ===>
399
400         if self.eof:
401             return ''                                                   # ===>
402
403         sel = select.select([self.fd], [], [self.fd], 0)
404         if sel[2]:
405             self.eof = 1
406         if sel[0]:
407             self.buf = os.read(self.fd, self.chunkSize)                 # ===>
408             return self.buf[0]          # Assume select don't lie.
409         else:
410             return ''                                                 # ===>
411
412     def readPendingChar(self):
413         """Consume first character of unconsumed output from file, or empty
414         string if none."""
415
416         if self.buf:
417             got, self.buf = self.buf[0], self.buf[1:]
418             return got                                                  # ===>
419
420         if self.eof:
421             return ''                                                   # ===>
422
423         sel = select.select([self.fd], [], [self.fd], 0)
424         if sel[2]:
425             self.eof = 1
426         if sel[0]:
427             return os.read(self.fd, 1)                                  # ===>
428         else:
429             return ''                                                 # ===>
430
431     def readPendingChars(self, max=None):
432         """Consume uncomsumed output from FILE, or empty string if nothing
433         pending."""
434
435         got = ""
436         if self.buf:
437             if (max > 0) and (len(self.buf) > max):
438                 got = self.buf[0:max]
439                 self.buf = self.buf[max:]
440             else:
441                 got, self.buf = self.buf, ''
442             return got
443
444         if self.eof:
445             return ''
446
447         sel = select.select([self.fd], [], [self.fd], 0)
448         if sel[2]:
449             self.eof = 1
450         if sel[0]:
451             got = got + os.read(self.fd, self.chunkSize)
452             if max == 0:
453                 self.buf = got
454                 return ''
455             elif max is None:
456                 return got
457             elif len(got) > max:
458                 self.buf = self.buf + got[max:]
459                 return got[:max]
460             else:
461                 return got
462         else:
463             return ''
464
465     def readPendingLine(self, block=0):
466         """Return pending output from FILE, up to first newline (inclusive).
467
468         Does not block unless optional arg BLOCK is true.
469
470         Note that an error will be raised if a new eof is encountered without
471         any newline."""
472
473         if self.buf:
474             to = self.buf.find('\n')
475             if to != -1:
476                 got, self.buf = self.buf[:to+1], self.buf[to+1:]
477                 return got                                              # ===>
478             got, self.buf = self.buf, ''
479         else:
480             if self.eof:
481                 return ''                                               # ===>
482             got = ''
483
484         # Herein, 'got' contains the (former) contents of the buffer, and it
485         # doesn't include a newline.
486         fdlist = [self.fd]
487         period = block and 1.0 or 0     # don't be too busy while waiting
488         while 1:                        # (we'll only loop if block set)
489             sel = select.select(fdlist, [], fdlist, period)
490             if sel[2]:
491                 self.eof = 1
492             if sel[0]:
493                 got = got + os.read(self.fd, self.chunkSize)
494
495             to = got.find('\n')
496             if to != -1:
497                 got, self.buf = got[:to+1], got[to+1:]
498                 return got                                              # ===>
499             if not block:
500                 return got                                              # ===>
501             if self.eof:
502                 self.buf = ''           # this is how an ordinary file acts...
503                 return got
504             # otherwise - no newline, blocking requested, no eof - loop. # ==^
505
506     def readline(self):
507         """Return next output line from file, blocking until it is received."""
508
509         return self.readPendingLine(1)                                  # ===>
510
511
512 #############################################################################
513 #####            Encapsulated reading and writing            ##### noqa: E266
514 #############################################################################
515 # Encapsulate messages so the end can be unambiguously identified, even
516 # when they contain multiple, possibly empty lines.
517
518
519 class RecordFile:
520     """Encapsulate stream object for record-oriented IO.
521
522     Particularly useful when dealing with non-line oriented communications
523     over pipes, eg with subprocesses."""
524
525     # Message is written preceded by a line containing the message length.
526
527     def __init__(self, f):
528         self.file = f
529
530     def write_record(self, s):
531         "Write so self.read knows exactly how much to read."
532         f = self.__dict__['file']
533         f.write("%s\n%s" % (len(s), s))
534         if hasattr(f, 'flush'):
535             f.flush()
536
537     def read_record(self):
538         "Read and reconstruct message as prepared by self.write."
539         f = self.__dict__['file']
540         line = f.readline()[:-1]
541         if line:
542             try:
543                 l = int(line)
544             except ValueError:
545                 raise IOError(("corrupt %s file structure"
546                                % self.__class__.__name__))
547             return f.read(l)
548         else:
549             # EOF.
550             return ''
551
552     def __getattr__(self, attr):
553         """Implement characteristic file object attributes."""
554         f = self.__dict__['file']
555         if hasattr(f, attr):
556             return getattr(f, attr)
557         else:
558             raise AttributeError(attr)
559
560     def __repr__(self):
561         return "<%s of %s at %s>" % (self.__class__.__name__,
562                                      self.__dict__['file'],
563                                      hex(id(self))[2:])
564
565
566 def record_trial(s):
567     """Exercise encapsulated write/read with an arbitrary string.
568
569     Raise IOError if the string gets distorted through transmission!"""
570     from StringIO import StringIO
571     sf = StringIO()
572     c = RecordFile(sf)
573     c.write(s)
574     c.seek(0)
575     r = c.read()
576     show = " start:\t %s\n end:\t %s\n" % (repr(s), repr(r))
577     if r != s:
578         raise IOError("String distorted:\n%s" % show)
579
580
581 #############################################################################
582 #####            An example subprocess interfaces            ##### noqa: E266
583 #############################################################################
584
585
586 class Ph:
587     """Convenient interface to CCSO 'ph' nameserver subprocess.
588
589     .query('string...') method takes a query and returns a list of dicts, each
590     of which represents one entry."""
591
592     # Note that i made this a class that handles a subprocess object, rather
593     # than one that inherits from it.  I didn't see any functional
594     # disadvantages, and didn't think that full support of the entire
595     # Subprocess functionality was in any way suitable for interaction with
596     # this specialized interface.  ?  klm 13-Jan-1995
597
598     def __init__(self):
599         try:
600             self.proc = Subprocess('ph', 1)
601         except:
602             raise SubprocessError('failure starting ph: %s' %         # ===>
603                                   str(sys.exc_value))
604
605     def query(self, q):
606         """Send a query and return a list of dicts for responses.
607
608         Raise a ValueError if ph responds with an error."""
609
610         self.clear()
611
612         self.proc.writeline('query ' + q)
613         got = []
614         it = {}
615         while 1:
616             response = self.getreply()  # Should get null on new prompt.
617             errs = self.proc.readPendingErrChars()
618             if errs:
619                 sys.stderr.write(errs)
620             if it:
621                 got.append(it)
622                 it = {}
623             if not response:
624                 return got                                              # ===>
625             elif isinstance(response, str):
626                 raise ValueError("ph failed match: '%s'" % response)    # ===>
627             for line in response:
628                 # convert to a dict:
629                 line = line.split(':')
630                 it[line.strip([0])] = (''.join(line[1:])).strip()
631
632     def getreply(self):
633         """Consume next response from ph, returning list of lines or string
634         err."""
635         # Key on first char:  (First line may lack newline.)
636         #  - dash       discard line
637         #  - 'ph> '     conclusion of response
638         #  - number     error message
639         #  - whitespace beginning of next response
640
641         nextChar = self.proc.waitForPendingChar(60)
642         if not nextChar:
643             raise SubprocessError('ph subprocess not responding')       # ===>
644         elif nextChar == '-':
645             # dashed line - discard it, and continue reading:
646             self.proc.readline()
647             return self.getreply()                                      # ===>
648         elif nextChar == 'p':
649             # 'ph> ' prompt - don't think we should hit this, but what the hay:
650             return ''                                                   # ===>
651         elif nextChar in '0123456789':
652             # Error notice - we're currently assuming single line errors:
653             return self.proc.readline()[:-1]                            # ===>
654         elif nextChar in ' \t':
655             # Get content, up to next dashed line:
656             got = []
657             while nextChar != '-' and nextChar != '':
658                 got.append(self.proc.readline()[:-1])
659                 nextChar = self.proc.peekPendingChar()
660             return got
661
662     def __repr__(self):
663         return "<Ph instance, %s at %s>\n" % (self.proc.status(),
664                                               hex(id(self))[2:])
665
666     def clear(self):
667         """Clear-out initial preface or residual subproc input and output."""
668         pause = .5
669         maxIter = 10        # 5 seconds to clear
670         iterations = 0
671         got = ''
672         self.proc.write('')
673         while iterations < maxIter:
674             got = got + self.proc.readPendingChars()
675             # Strip out all but the last incomplete line:
676             got = got.split('\n')[-1]
677             if got == 'ph> ':
678                 return    # Ok.                             ===>
679             time.sleep(pause)
680         raise SubprocessError('ph not responding within %s secs' %
681                               pause * maxIter)
682
683
684 ##############################################################################
685 #####                          Test                           ##### noqa: E266
686 ##############################################################################
687
688
689 def test(p=0):
690     print("\tOpening bogus subprocess, should fail:")
691     try:
692         Subprocess('/', 1)
693         print("\tOops!  Null-named subprocess startup *succeeded*?!?")
694     except SubprocessError:
695         print("\t...yep, it failed.")
696     print("\tOpening cat subprocess:")
697     p = Subprocess('cat', 1)            # set to expire noisily...
698     print(p)
699     print('\tWrite, then read, two newline-teriminated lines, using readline:')
700     p.write('first full line written\n')
701     p.write('second.\n')
702     print(repr(p.readline()))
703     print(repr(p.readline()))
704     print('\tThree lines, last sans newline, read using combination:')
705     p.write('first\n')
706     p.write('second\n')
707     p.write('third, (no cr)')
708     print('\tFirst line via readline:')
709     print(repr(p.readline()))
710     print('\tRest via readPendingChars:')
711     print(p.readPendingChars())
712     print("\tStopping then continuing subprocess (verbose):")
713     if not p.stop(1):                   # verbose stop
714         print('\t** Stop seems to have failed!')
715     else:
716         print('\tWriting line while subprocess is paused...')
717         p.write('written while subprocess paused\n')
718         print('\tNonblocking read of paused subprocess (should be empty):')
719         print(p.readPendingChars())
720         print('\tContinuing subprocess (verbose):')
721         if not p.cont(1):               # verbose continue
722             print(
723                 '\t** Continue seems to have failed!  Probly lost subproc...')
724             return p
725         else:
726             print('\tReading accumulated line, blocking read:')
727             print(p.readline())
728             print("\tDeleting subproc, which was set to die noisily:")
729             del p
730             print("\tDone.")
731             return None
732
733
734 if __name__ == '__main__':
735     test()