"""
__author__ = "Oleg Broytman <phd@phdru.name>"
-__copyright__ = "Copyright (C) 2000-2014 PhiloSoft Design"
+__copyright__ = "Copyright (C) 2000-2023 PhiloSoft Design"
__license__ = "GNU GPL"
__all__ = ['robot_base', 'get_error']
-import sys
-import time, urllib
from base64 import b64encode
+import sys
+import socket
+import time
+import urllib
from urlparse import urljoin
-from m_lib.net.www.util import parse_time
from m_lib.md5wrapper import md5wrapper
+from m_lib.net.www.util import parse_time
from bkmk_objects import Robot
from parse_html import parse_html
-class RedirectException(Exception):
- reloc_dict = {
- 301: "perm.",
- 302: "temp2.",
- 303: "temp3.",
- 307: "temp7.",
- "html": "html"
- }
- def __init__(self, errcode, newurl):
- Exception.__init__(self, "(%s) to %s" % (self.reloc_dict[errcode], newurl))
- self.url = newurl
+reloc_dict = {
+ 301: "perm.",
+ 302: "temp2.",
+ 303: "temp3.",
+ 307: "temp7.",
+ "html": "html"
+}
-def get_error(msg):
- if isinstance(msg, str):
- return msg
+def get_error(e):
+ if isinstance(e, str):
+ return e
- else:
- s = []
- for i in msg:
- s.append("'%s'" % str(i).replace('\n', "\\n"))
- return "(%s)" % ' '.join(s)
+ else:
+ s = []
+ for i in e:
+ s.append("'%s'" % str(i).replace('\n', "\\n"))
+ return "(%s)" % ' '.join(s)
-icons = {} # Icon cache; maps URL to a tuple (content type, data)
- # or None if there is no icon.
+# Icon cache; maps URL to a tuple (content type, data)
+# or None if there is no icon.
+icons = {}
+
class robot_base(Robot):
- def check_url(self, bookmark):
- try:
- self.start = int(time.time())
- bookmark.icon = None
+ timeout = 60
+
+ def __init__(self, *args, **kw):
+ Robot.__init__(self, *args, **kw)
+ socket.setdefaulttimeout(int(self.timeout))
+
+ def check_url(self, bookmark):
+ try:
+ self.start = int(time.time())
+ bookmark.icon = None
+
+ url_type, url_rest = urllib.splittype(bookmark.href)
+ url_host, url_path = urllib.splithost(url_rest)
+ url_path, url_tag = urllib.splittag(url_path) # noqa: E221
+ # multiple spaces before operator
- url_type, url_rest = urllib.splittype(bookmark.href)
- url_host, url_path = urllib.splithost(url_rest)
- url_path, url_tag = urllib.splittag(url_path)
+ url = "%s://%s%s" % (url_type, url_host, url_path)
+ error, redirect_code, redirect_to, headers, content = self.get(bookmark, url, True)
- url = "%s://%s%s" % (url_type, url_host, url_path)
- headers, content, error = self.urlretrieve(bookmark, url, True)
+ if error:
+ bookmark.error = error
+ return 1
- if error:
- bookmark.error = error
+ if redirect_code:
+ self.set_redirect(bookmark, redirect_code, redirect_to)
+ return 1
- if content is None:
- return 1
+ size = 0
+ last_modified = None
- size = 0
- last_modified = None
+ if headers:
+ try:
+ size = headers["Content-Length"]
+ except KeyError:
+ size = len(content)
- if headers:
- try:
- size = headers["Content-Length"]
- except KeyError:
- size = len(content)
+ try:
+ last_modified = headers["Last-Modified"]
+ except KeyError:
+ pass
- try:
- last_modified = headers["Last-Modified"]
- except KeyError:
- pass
+ if last_modified:
+ last_modified = parse_time(last_modified)
+ else:
+ size = len(content)
if last_modified:
- last_modified = parse_time(last_modified)
- else:
- size = len(content)
-
- if last_modified:
- last_modified = str(int(last_modified))
- else:
- last_modified = bookmark.last_visit
-
- bookmark.size = size
- bookmark.last_modified = last_modified
-
- md5 = md5wrapper()
- if urllib._urlopener.type == "ftp": # Pass welcome message through MD5
- md5.update(self.get_ftp_welcome())
-
- md5.update(content)
- bookmark.md5 = str(md5)
-
- if headers:
- try:
- content_type = headers["Content-Type"]
- self.log(" Content-Type: %s" % content_type)
- try:
- # extract charset from "text/html; foo; charset=UTF-8, bar; baz;"
- content_type, charset = content_type.split(';', 1)
- content_type = content_type.strip()
- charset = charset.split('=')[1].strip().split(',')[0]
- self.log(" HTTP charset : %s" % charset)
- except (ValueError, IndexError):
- charset = None
- self.log(" no charset in Content-Type header")
- for ctype in ("text/html", "application/xhtml+xml"):
- if content_type.startswith(ctype):
- html = True
- break
- else:
- html = False
- if html:
- parser = parse_html(content, charset, self.log)
- if parser:
- bookmark.real_title = parser.title
- icon = parser.icon
- else:
- icon = None
- if not icon:
- icon = "/favicon.ico"
- icon_url = urljoin("%s://%s%s" % (url_type, url_host, url_path), icon)
- self.log(" looking for icon at: %s" % icon_url)
- if icon_url in icons:
- if icons[icon_url]:
- bookmark.icon_href = icon_url
- content_type, bookmark.icon = icons[icon_url]
- self.log(" cached icon: %s" % content_type)
- else:
- self.log(" cached icon: no icon")
- else:
- try:
- _icon_url = icon_url
- for i in range(8):
- try:
- icon_headers, icon_data, error = self.urlretrieve(bookmark, _icon_url)
- except RedirectException, e:
- _icon_url = e.url
- self.log(" redirect to : %s" % _icon_url)
- else:
- if icon_data is None:
- raise IOError("No icon")
- break
+ last_modified = str(int(last_modified))
+ else:
+ last_modified = bookmark.last_visit
+
+ bookmark.size = size
+ bookmark.last_modified = last_modified
+
+ md5 = md5wrapper()
+ if url_type == "ftp": # Pass welcome message through MD5
+ md5.update(self.get_ftp_welcome())
+
+ md5.update(content)
+ bookmark.md5 = str(md5)
+
+ if headers:
+ try:
+ content_type = headers["Content-Type"]
+ self.log(" Content-Type: %s" % content_type)
+ try:
+ # extract charset from "text/html; foo; charset=UTF-8, bar; baz;"
+ content_type, charset = content_type.split(';', 1)
+ content_type = content_type.strip()
+ charset = charset.split('=')[1].strip().split(',')[0]
+ self.log(" HTTP charset : %s" % charset)
+ except (ValueError, IndexError):
+ charset = None
+ self.log(" no charset in Content-Type header")
+ for ctype in ("text/html", "application/xhtml+xml"):
+ if content_type.startswith(ctype):
+ html = True
+ break
+ else:
+ html = False
+ if html:
+ parser = parse_html(content, charset, self.log)
+ if parser:
+ bookmark.real_title = parser.title
+ icon = parser.icon
else:
- raise IOError("Too many redirects")
- except:
- etype, emsg, tb = sys.exc_info()
- self.log(" no icon : %s %s" % (etype, emsg))
- etype = emsg = tb = None
- icons[icon_url] = None
- else:
- content_type = icon_headers["Content-Type"]
- if content_type.startswith("application/") \
- or content_type.startswith("image/") \
- or content_type.startswith("text/plain"):
- bookmark.icon_href = icon_url
- self.log(" got icon : %s" % content_type)
- if content_type.startswith("application/") \
- or content_type.startswith("text/plain"):
- self.log(" non-image content type, assume x-icon")
- content_type = 'image/x-icon'
- bookmark.icon = "data:%s;base64,%s" % (content_type, b64encode(icon_data))
- icons[icon_url] = (content_type, bookmark.icon)
+ icon = None
+ if not icon:
+ icon = "/favicon.ico"
+ icon_url = urljoin("%s://%s%s" % (url_type, url_host, url_path), icon)
+ self.log(" looking for icon at: %s" % icon_url)
+ if icon_url in icons:
+ if icons[icon_url]:
+ bookmark.icon_href = icon_url
+ content_type, bookmark.icon = icons[icon_url]
+ self.log(" cached icon: %s" % content_type)
+ else:
+ self.log(" cached icon: no icon")
else:
- self.log(" no icon : bad content type '%s'" % content_type)
- icons[icon_url] = None
- if parser and parser.refresh:
- refresh = parser.refresh
- try:
- url = refresh.split('=', 1)[1]
- except IndexError:
- url = "self"
- try:
- timeout = float(refresh.split(';')[0])
- except (IndexError, ValueError):
- raise RedirectException("html", "Bad redirect to %s (%s)" % (url, refresh))
- else:
- try:
- timeout = int(refresh.split(';')[0])
- except ValueError:
- pass # float timeout
- raise RedirectException("html", "%s (%s sec)" % (url, timeout))
-
- except KeyError, key:
- self.log(" no header: %s" % key)
-
- except EOFError:
- bookmark.error = "Unexpected EOF (FTP server closed connection)"
- self.log(' EOF: %s' % bookmark.error)
-
- except RedirectException, msg:
- bookmark.moved = str(msg)
- self.log(' Moved: %s' % bookmark.moved)
-
- except KeyboardInterrupt:
- self.log("Keyboard interrupt (^C)")
- return 0
-
- except:
- import traceback
- traceback.print_exc()
- bookmark.error = "Exception!"
- self.log(' Exception: %s' % bookmark.error)
-
- finally:
- self.finish_check_url(bookmark)
-
- # Tested
- return 1
-
- def finish_check_url(self, bookmark):
- start = self.start
- bookmark.last_tested = str(start)
-
- now = int(time.time())
- bookmark.test_time = str(now - start)
-
- self.cleanup()
+ try:
+ _icon_url = icon_url
+ for i in range(8):
+ error, icon_redirect_code, icon_redirect_to, \
+ icon_headers, icon_data = \
+ self.get(bookmark, _icon_url)
+ if icon_redirect_code:
+ _icon_url = icon_redirect_to
+ self.log(" redirect to : %s" % _icon_url)
+ else:
+ if icon_data is None:
+ raise IOError("No icon")
+ break
+ else:
+ raise IOError("Too many redirects")
+ except:
+ etype, emsg, tb = sys.exc_info()
+ self.log(" no icon : %s %s" % (etype, emsg))
+ etype = emsg = tb = None
+ icons[icon_url] = None
+ else:
+ content_type = icon_headers["Content-Type"]
+ if content_type.startswith("application/") \
+ or content_type.startswith("image/") \
+ or content_type.startswith("text/plain"):
+ bookmark.icon_href = icon_url
+ self.log(" got icon : %s" % content_type)
+ if content_type.startswith("application/") \
+ or content_type.startswith("text/plain"):
+ self.log(" non-image content type, assume x-icon")
+ content_type = 'image/x-icon'
+ bookmark.icon = "data:%s;base64,%s" % (content_type, b64encode(icon_data))
+ icons[icon_url] = (content_type, bookmark.icon)
+ else:
+ self.log(" no icon : bad content type '%s'" % content_type)
+ icons[icon_url] = None
+ if parser and parser.refresh:
+ refresh = parser.refresh
+ try:
+ url = refresh.split('=', 1)[1]
+ except IndexError:
+ url = "self"
+ try:
+ timeout = float(refresh.split(';')[0])
+ except (IndexError, ValueError):
+ self.set_redirect(bookmark, "html", "Bad redirect to %s (%s)" % (url, refresh))
+ else:
+ try:
+ timeout = int(refresh.split(';')[0])
+ except ValueError:
+ pass # float timeout
+ self.set_redirect(bookmark, "html", "%s (%s sec)" % (url, timeout))
+
+ except KeyError as key:
+ self.log(" no header: %s" % key)
+
+ except EOFError:
+ bookmark.error = "Unexpected EOF (FTP server closed connection)"
+ self.log(' EOF: %s' % bookmark.error)
+
+ except KeyboardInterrupt:
+ self.log("Keyboard interrupt (^C)")
+ return 0
+
+ except socket.error as e:
+ bookmark.error = get_error(e)
+ self.log(bookmark.error)
+
+ except:
+ import traceback
+ traceback.print_exc()
+ bookmark.error = "Exception!"
+ self.log(' Exception: %s' % bookmark.error)
+
+ finally:
+ self.finish_check_url(bookmark)
+
+ # Tested
+ return 1
+
+ def set_redirect(self, bookmark, errcode, newurl):
+ bookmark.moved = "(%s) to %s" % (reloc_dict[errcode], newurl)
+ self.log(' Moved: %s' % bookmark.moved)
+
+ def finish_check_url(self, bookmark):
+ start = self.start
+ bookmark.last_tested = str(start)
+ now = int(time.time())
+ bookmark.test_time = str(now - start)