--- /dev/null
+"""Robot based on PycURL
+
+This file is a part of Bookmarks database and Internet robot.
+
+"""
+
+__author__ = "Oleg Broytman <phd@phdru.name>"
+__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
+__license__ = "GNU GPL"
+
+__all__ = ['robot_curl']
+
+
+import certifi
+import pycurl
+
+from Robots.bkmk_robot_base import robot_base, request_headers
+
+
+class robot_curl(robot_base):
+ def get(self, bookmark, url, accept_charset=False, use_proxy=False):
+ if accept_charset and bookmark.charset:
+ headers = request_headers.copy()
+ headers['Accept-Charset'] = bookmark.charset
+ else:
+ headers = request_headers
+ headers = ['%s: %s' % (k, v) for k, v in headers.items()]
+
+ curl = pycurl.Curl()
+ self.headers = {}
+ self.body = b''
+
+ # Do not follow redirects
+ curl.setopt(pycurl.FOLLOWLOCATION, 0)
+ # Verify that we've got the right site; harmless on a non-SSL connect.
+ curl.setopt(pycurl.SSL_VERIFYHOST, 2)
+ curl.setopt(curl.CAINFO, certifi.where())
+ # Set timeouts to avoid hanging too long
+ curl.setopt(pycurl.CONNECTTIMEOUT, 30)
+ curl.setopt(pycurl.TIMEOUT, 60)
+ # Parse Last-Modified
+ curl.setopt(pycurl.OPT_FILETIME, 1)
+
+ if use_proxy:
+ curl.setopt(pycurl.PROXY, self.proxy)
+
+ # Set up a callback to capture the headers and the body
+ curl.setopt(pycurl.HEADERFUNCTION, self.header_callback)
+ curl.setopt(pycurl.WRITEFUNCTION, self.body_callback)
+
+ curl.setopt(pycurl.HTTPGET, 1)
+ curl.setopt(pycurl.HTTPHEADER, headers)
+ curl.setopt(pycurl.URL, url)
+ try:
+ curl.perform()
+ except pycurl.error as e:
+ error = str(e)
+ return error, None, None, None, None
+
+ status = curl.getinfo(pycurl.HTTP_CODE)
+ curl.close()
+
+ if status >= 400:
+ return "Error %d" % status, status, None, None, None
+ if status >= 300:
+ return None, status, self.headers['Location'], None, None
+ return None, None, None, self.headers, self.body
+
+ def header_callback(self, data):
+ for encoding in 'ascii', 'latin1', 'utf-8':
+ try:
+ data = data.decode(encoding)
+ except UnicodeDecodeError:
+ pass
+ else:
+ break
+ else:
+ print("Error decoding header:", data)
+ return
+ if ':' in data:
+ key, value = data.split(':', 1)
+ self.headers[key.title()] = value.strip()
+
+ def body_callback(self, data):
+ self.body += data
+
+ def get_ftp_welcome(self):
+ return '' # We doen't store welcome message yet