-"""Simple robot based on twisted.web
-
-This file is a part of Bookmarks database and Internet robot.
-
-"""
-
-__author__ = "Oleg Broytman <phd@phdru.name>"
-__copyright__ = "Copyright (C) 2025 PhiloSoft Design"
-__license__ = "GNU GPL"
-
-__all__ = ['robot_twisted']
-
-from urllib.parse import urlsplit
-import threading
-
-from twisted.internet import reactor, _sslverify
-from twisted.internet.endpoints import TCP4ClientEndpoint
-from twisted.web.client import Agent, ProxyAgent, readBody
-from twisted.web.http_headers import Headers
-import twisted
-
-from Robots.base import robot_base
-from Robots.util import encode_url, get_ftp
-
-
-_sslverify.platformTrust = lambda: None
-
-reactor_thread = threading.Thread(target=reactor.run,
- kwargs={'installSignalHandlers': False})
-reactor_thread.start()
-
-
-class robot_twisted(robot_base):
- def __init__(self, *args, **kw):
- robot_base.__init__(self, *args, **kw)
- self.event = threading.Event()
-
- def version_str(self):
- return 'twisted/%s' % twisted.version
-
- async def get(self, url, req_headers, use_proxy=False):
- if url.startswith('ftp://'):
- error, welcome, body = get_ftp(url, self.timeout)
- if error is not None:
- return error, None, None, None
- self.welcome = welcome
- return None, None, None, body
-
- if use_proxy:
- split_proxy = urlsplit(self.proxy)
- proxy_host = split_proxy.hostname
- proxy_port = split_proxy.port
- endpoint = TCP4ClientEndpoint(reactor, proxy_host, proxy_port)
- agent = ProxyAgent(endpoint, reactor)
- else:
- agent = Agent(reactor, connectTimeout=self.timeout)
- url = encode_url(url).encode('ascii')
- _headers = {k: [v] for k, v in req_headers.items()}
- self.response = self.error = self.body = None
- d = agent.request(b"GET", url,
- Headers(_headers), None)
- d.addCallbacks(self.cbResponse, self.cbError)
-
- self.event.clear()
- self.event.wait(self.timeout)
-
- if self.error is None:
- if self.body is None:
- return 'Timeout', None, None, None
- else:
- error = self.error.getErrorMessage()
- return error, None, None, None
-
- r = self.response
- resp_headers = {k.decode('ascii'): decode_header(v[0])
- for k, v in r.headers.getAllRawHeaders()}
- return None, r.code, resp_headers, self.body
-
- def get_ftp_welcome(self):
- welcome = self.welcome
- self.welcome = ''
- return welcome
-
- def stop(self):
- reactor.stop()
- reactor_thread.join(self.timeout)
-
- def cbResponse(self, response):
- self.response = response
-
- d = readBody(response)
- d.addCallback(self.cbBody)
- return d
-
- def cbError(self, error):
- self.error = error
- self.event.set()
-
- def cbBody(self, body):
- self.body = body
- self.event.set()
-
-
-def decode_header(header):
- for encoding in 'ascii', 'latin1', 'utf-8':
- try:
- header = header.decode(encoding)
- except UnicodeDecodeError:
- pass
- else:
- return header
- return ''