+++ /dev/null
-"""Base class for robots
-
-This file is a part of Bookmarks database and Internet robot.
-
-"""
-
-__author__ = "Oleg Broytman <phd@phdru.name>"
-__copyright__ = "Copyright (C) 2000-2024 PhiloSoft Design"
-__license__ = "GNU GPL"
-
-__all__ = ['robot_base', 'get_error']
-
-
-from base64 import b64encode
-from urllib.parse import urlsplit, urljoin
-import sys
-import socket
-import time
-
-from m_lib.md5wrapper import md5wrapper
-from m_lib.net.www.util import parse_time
-
-from bkmk_objects import Robot
-from parse_html import parse_html
-
-
-# Fake headers to pretend this is a real browser
-_user_agent = "Mozilla/5.0 (X11; U; Linux 2.6 i686; en)"
-" Gecko/20001221 Firefox/2.0.0"
-_x_user_agent = "bookmarks_db (Python %d.%d.%d)" % sys.version_info[:3]
-
-request_headers = {
- 'Accept': '*/*',
- 'Accept-Language': 'ru,en',
- 'Cache-Control': 'max-age=300',
- 'Connection': 'close',
- 'Referer': '/',
- 'User-Agent': _user_agent,
- 'X-User-Agent': _x_user_agent,
-}
-
-
-reloc_dict = {
- 301: "perm1.",
- 302: "temp2.",
- 303: "temp3.",
- 307: "temp7.",
- 308: "temp8.",
- "html": "html"
-}
-
-
-def get_error(e):
- if isinstance(e, str):
- return e
-
- else:
- s = []
- for i in e:
- s.append("'%s'" % str(i).replace('\n', "\\n"))
- return "(%s)" % ' '.join(s)
-
-
-# Icon cache; maps URL to a tuple (content type, data)
-# or None if there is no icon.
-icons = {}
-
-
-class robot_base(Robot):
- timeout = 60
-
- def __init__(self, *args, **kw):
- Robot.__init__(self, *args, **kw)
- socket.setdefaulttimeout(int(self.timeout))
-
- def check_url(self, bookmark):
- try:
- self.start = int(time.time())
- bookmark.icon = None
-
- split_results = urlsplit(bookmark.href)
- url_type, netloc, url_path, query, url_tag = split_results
- url_host = split_results.hostname
-
- url = "%s://%s%s" % (url_type, url_host, url_path)
- error, redirect_code, redirect_to, headers, content = \
- self.get(bookmark, url, True)
-
- if error:
- bookmark.error = error
- return 1
-
- if redirect_code:
- self.set_redirect(bookmark, redirect_code, redirect_to)
- return 1
-
- size = 0
- last_modified = None
-
- if headers:
- try:
- size = headers["Content-Length"]
- except KeyError:
- pass
-
- try:
- last_modified = headers["Last-Modified"]
- except KeyError:
- pass
-
- if last_modified:
- last_modified = parse_time(last_modified)
-
- if not size: # Could be None from headers
- size = len(content)
-
- if last_modified:
- last_modified = str(int(last_modified))
- else:
- last_modified = bookmark.last_visit
-
- bookmark.size = size
- bookmark.last_modified = last_modified
-
- charset = None
- if headers:
- try:
- content_type = headers["Content-Type"]
- self.log(" Content-Type : %s" % content_type)
- if content_type is None:
- if b'html' in content.lower():
- content_type = 'text/html'
- else:
- content_type = 'text/plain'
- self.log(" Set Content-Type to: %s"
- % content_type)
- try:
- # extract charset from
- # "text/html; charset=UTF-8, foo; bar"
- content_type, charset = content_type.split(';', 1)
- content_type = content_type.strip()
- charset = charset.split('=')[1].strip().split(',')[0]
- self.log(" HTTP charset : %s" % charset)
- except (ValueError, IndexError):
- charset = None
- self.log(" no charset in Content-Type header")
- is_html = False
- for ctype in ("text/html", "application/xhtml+xml"):
- if content_type.startswith(ctype):
- is_html = True
- break
- content_stripped = content.strip()
- if content_stripped and charset:
- try:
- content_stripped = content_stripped.decode(
- charset, 'replace')
- except LookupError:
- charset = None
- self.log(" unknown charset "
- "in Content-Type header")
- if content_stripped and is_html:
- parser = parse_html(
- content_stripped, charset, self.log)
- if charset:
- bookmark.charset = charset
- elif parser and parser.meta_charset:
- bookmark.charset = parser.meta_charset
- if parser:
- bookmark.real_title = parser.title
- icon = parser.icon
- else:
- icon = None
- if not icon:
- icon = "/favicon.ico"
- icon_url = urljoin(
- "%s://%s%s" % (url_type, url_host, url_path), icon)
- self.log(" looking for icon at: %s" % icon_url)
- if icon_url in icons:
- if icons[icon_url]:
- bookmark.icon_href = icon_url
- content_type, bookmark.icon = icons[icon_url]
- self.log(" cached icon: %s" % content_type)
- else:
- self.log(" cached icon: no icon")
- elif icon_url.startswith('data:'):
- content_type, icon_data = \
- icon_url[len('data:'):].split(',', 1)
- bookmark.icon_href = bookmark.icon = icon_url
- self.log(" got data icon : %s" % content_type)
- icons[icon_url] = (content_type, icon_url)
- else:
- try:
- _icon_url = icon_url
- for i in range(8):
- error, icon_redirect_code, \
- icon_redirect_to, icon_headers, \
- icon_data = \
- self.get(bookmark, _icon_url)
- if icon_redirect_code:
- _icon_url = icon_redirect_to
- self.log(" redirect to : %s"
- % _icon_url)
- else:
- if icon_data is None:
- raise IOError("No icon")
- break
- else:
- raise IOError("Too many redirects")
- except:
- etype, emsg, _ = sys.exc_info()
- self.log(" no icon : %s %s"
- % (etype, emsg))
- etype = emsg = _ = None
- icons[icon_url] = None
- else:
- content_type = icon_headers["Content-Type"]
- if content_type and (
- content_type.startswith("application/")
- or content_type.startswith("image/")
- or content_type.startswith("text/plain")
- ):
- bookmark.icon_href = icon_url
- self.log(" got icon : %s"
- % content_type)
- if (
- content_type.startswith("application/")
- or content_type.startswith(
- "text/plain")
- ):
- self.log(" non-image content type,"
- " assume x-icon")
- content_type = 'image/x-icon'
- if not isinstance(icon_data, bytes):
- icon_data = icon_data.encode('latin1')
- bookmark.icon = "data:%s;base64,%s" \
- % (content_type, b64encode(icon_data))
- icons[icon_url] = (content_type,
- bookmark.icon
- )
- else:
- self.log(" no icon : "
- "bad content type '%s'"
- % content_type
- )
- icons[icon_url] = None
- if parser and parser.refresh:
- refresh = parser.refresh
- try:
- url = refresh.split('=', 1)[1]
- except IndexError:
- url = "self"
- try:
- timeout = float(refresh.split(';')[0])
- except (IndexError, ValueError):
- self.set_redirect(bookmark, "html",
- "Bad redirect to %s (%s)"
- % (url, refresh)
- )
- else:
- try:
- timeout = int(refresh.split(';')[0])
- except ValueError:
- pass # float timeout
- self.set_redirect(bookmark, "html",
- "%s (%s sec)"
- % (url, timeout)
- )
- elif charset:
- bookmark.charset = charset
-
- if not content_stripped:
- self.log(" empty response, no content")
- if not is_html:
- self.log(" not html")
- except KeyError as key:
- self.log(" no header: %s" % key)
-
- md5 = md5wrapper()
- if url_type == "ftp": # Pass welcome message through MD5
- ftp_welcome = self.get_ftp_welcome()
- if not isinstance(ftp_welcome, bytes):
- ftp_welcome = ftp_welcome.encode(charset or 'utf-8')
- md5.update(ftp_welcome)
-
- if isinstance(content, bytes):
- md5.update(content)
- else:
- md5.update(content.encode(charset or 'utf-8'))
- bookmark.md5 = str(md5)
-
- except EOFError:
- bookmark.error = "Unexpected EOF (FTP server closed connection)"
- self.log(' EOF: %s' % bookmark.error)
-
- except KeyboardInterrupt:
- self.log("Keyboard interrupt (^C)")
- return 0
-
- except socket.error as e:
- bookmark.error = get_error(e)
- self.log(bookmark.error)
-
- except:
- import traceback
- traceback.print_exc()
- bookmark.error = "Exception!"
- self.log(' Exception: %s' % bookmark.error)
-
- finally:
- self.finish_check_url(bookmark)
-
- # Tested
- return 1
-
- def set_redirect(self, bookmark, errcode, newurl):
- bookmark.moved = moved = "(%s) to %s" % (reloc_dict[errcode], newurl)
- try:
- moved.encode('ascii')
- except UnicodeEncodeError:
- try:
- moved = moved.encode(bookmark.charset)
- except (LookupError, TypeError, UnicodeEncodeError):
- moved = moved.encode('utf-8')
- self.log(' Moved: %s' % moved)
-
- def finish_check_url(self, bookmark):
- start = self.start
- bookmark.last_tested = str(start)
- now = int(time.time())
- bookmark.test_time = str(now - start)