1 """Robot based on requests
3 This file is a part of Bookmarks database and Internet robot.
7 __author__ = "Oleg Broytman <phd@phdru.name>"
8 __copyright__ = "Copyright (C) 2024 PhiloSoft Design"
9 __license__ = "GNU GPL"
11 __all__ = ['robot_requests']
14 from urllib.parse import urlsplit
19 from requests.adapters import HTTPAdapter
20 from requests.packages.urllib3.util.ssl_ import create_urllib3_context
24 from Robots.base import robot_base
27 class robot_requests(robot_base):
28 def version_str(self):
29 return 'python-requests urllib3/%s' % urllib3.__version__
31 async def get(self, url, req_headers, use_proxy=False):
32 if url.startswith('ftp://'):
33 error, welcome, body = _get_ftp(url, self.timeout)
35 return error, None, None, None, None
36 self.welcome = welcome
37 return None, None, None, body
40 proxies = {'http': self.proxy, 'https': self.proxy}
44 s = requests.Session()
45 s.mount('https://', AllCiphersAdapter())
49 r = s.get(url, headers=req_headers, timeout=self.timeout,
50 allow_redirects=False, proxies=proxies,
52 except requests.RequestException as e:
54 return error, None, None, None
56 return None, r.status_code, r.headers, r.content
58 def get_ftp_welcome(self):
59 welcome = self.welcome
64 def _get_ftp(url, timeout):
65 split_results = urlsplit(url)
66 path = split_results.path or '/'
67 user = split_results.username or 'anonymous'
68 password = split_results.password or 'ftp'
69 host = split_results.hostname
70 port = split_results.port or 21
74 ftp.connect(host, port, timeout)
75 ftp.login(user, password)
78 except (socket.error, ftplib.Error) as e:
79 return "Error: %s" % e, None, None
84 except ftplib.error_perm as e:
85 if str(e).startswith("550 No files found"):
86 return None, ftp.welcome, ''
87 return "Error: %s" % e, None, None
88 except ftplib.Error as e:
89 return "Error: %s" % e, None, None
91 return None, ftp.welcome, '\n'.join(files)
94 # See https://lukasa.co.uk/2017/02/Configuring_TLS_With_Requests/
96 class AllCiphersAdapter(HTTPAdapter):
98 A TransportAdapter that re-enables 3DES support in Requests.
100 def init_poolmanager(self, *args, **kwargs):
101 context = create_urllib3_context(cert_reqs=0,
102 ciphers='ALL:@SECLEVEL=1')
103 kwargs['ssl_context'] = context
104 return super(AllCiphersAdapter, self).init_poolmanager(*args, **kwargs)
106 def proxy_manager_for(self, *args, **kwargs):
107 context = create_urllib3_context(cert_reqs=0,
108 ciphers='ALL:@SECLEVEL=1')
109 kwargs['ssl_context'] = context
110 return super(AllCiphersAdapter, self).proxy_manager_for(
114 warnings.filterwarnings('ignore', 'Unverified HTTPS request is being made')