--- /dev/null
+"""Simple robot based on twisted.web
+
+This file is a part of Bookmarks database and Internet robot.
+
+"""
+
+__author__ = "Oleg Broytman <phd@phdru.name>"
+__copyright__ = "Copyright (C) 2025 PhiloSoft Design"
+__license__ = "GNU GPL"
+
+__all__ = ['robot_twisted']
+
+import threading
+import time
+
+from twisted.internet import reactor, _sslverify
+from twisted.web.client import Agent, readBody
+from twisted.web.http_headers import Headers
+import twisted
+
+from Robots.base import robot_base
+from Robots.util import encode_url, get_ftp
+
+
+_sslverify.platformTrust = lambda: None
+
+reactor_thread = threading.Thread(target=reactor.run,
+ kwargs={'installSignalHandlers': False})
+reactor_thread.start()
+
+
+class robot_twisted(robot_base):
+ def version_str(self):
+ return 'twisted/%s' % twisted.version
+
+ async def get(self, url, req_headers, use_proxy=False):
+ if url.startswith('ftp://'):
+ error, welcome, body = get_ftp(url, self.timeout)
+ if error is not None:
+ return error, None, None, None
+ self.welcome = welcome
+ return None, None, None, body
+
+ agent = Agent(reactor, connectTimeout=self.timeout)
+ url = encode_url(url).encode('ascii')
+ _headers = {k: [v] for k, v in req_headers.items()}
+ self.response = self.error = self.body = None
+ d = agent.request(b"GET", url,
+ Headers(_headers), None)
+ d.addCallbacks(self.cbResponse, self.cbError)
+
+ for i in range(self.timeout*10):
+ if self.error is None \
+ and self.body is None:
+ time.sleep(0.1)
+
+ if self.error is None:
+ if self.body is None:
+ return 'Timeout', None, None, None
+ else:
+ error = self.error.getErrorMessage()
+ return error, None, None, None
+
+ r = self.response
+ resp_headers = {k.decode('ascii'): decode_header(v[0])
+ for k, v in r.headers.getAllRawHeaders()}
+ return None, r.code, resp_headers, self.body
+
+ def get_ftp_welcome(self):
+ welcome = self.welcome
+ self.welcome = ''
+ return welcome
+
+ def stop(self):
+ reactor.stop()
+ reactor_thread.join(self.timeout)
+
+ def cbResponse(self, response):
+ self.response = response
+
+ d = readBody(response)
+ d.addCallback(self.cbBody)
+ return d
+
+ def cbError(self, error):
+ self.error = error
+
+ def cbBody(self, body):
+ self.body = body
+
+
+def decode_header(header):
+ for encoding in 'ascii', 'latin1', 'utf-8':
+ try:
+ header = header.decode(encoding)
+ except UnicodeDecodeError:
+ pass
+ else:
+ return header
+ return ''