From 1d42c1323bd599029dec8c5d3f127c49eb29af48 Mon Sep 17 00:00:00 2001 From: Oleg Broytman Date: Tue, 10 Sep 2024 00:18:11 +0300 Subject: [PATCH] Feat(Robots): Robot based on `twisted` and `concurrent.futures` Processes multiple URLs in parallel (multithreaded). --- Robots/bkmk_rtwisted.py | 106 ++++++++++++++++++++++++++++++++++++++++ bkmk_db-venv | 3 +- doc/ANNOUNCE | 26 ++-------- doc/ChangeLog | 7 +++ robots.py | 2 +- setup.py | 1 + 6 files changed, 121 insertions(+), 24 deletions(-) create mode 100644 Robots/bkmk_rtwisted.py diff --git a/Robots/bkmk_rtwisted.py b/Robots/bkmk_rtwisted.py new file mode 100644 index 0000000..3ee108e --- /dev/null +++ b/Robots/bkmk_rtwisted.py @@ -0,0 +1,106 @@ +"""Robot based on twisted and concurrent.futures, +processes multiple URLs in parallel (multithreaded). + +This file is a part of Bookmarks database and Internet robot. + +""" + +__author__ = "Oleg Broytman " +__copyright__ = "Copyright (C) 2024 PhiloSoft Design" +__license__ = "GNU GPL" + +__all__ = ['robot_twisted'] + + +from time import sleep + +from twisted import __version__ +from twisted.internet import reactor +from twisted.web.client import Agent, readBody +from twisted.web.http_headers import Headers + +from Robots.base import encode_url +from Robots.concurrent_futures import cf_multithread + +from twisted.internet import _sslverify +_sslverify.platformTrust = lambda: None + + +class robot_twisted(cf_multithread): + def __init__(self, *args, **kw): + cf_multithread.__init__(self, *args, **kw) + self.executor.submit(reactor.run, installSignalHandlers=False) + + def version_str(self): + return super(cf_multithread, self).version_str() \ + + '; Twisted ' + __version__ + + def cbRequest(self, response, queue, timeoutCall): + if timeoutCall.active(): + timeoutCall.cancel() + d = readBody(response) + d.addCallback(self.cbBody, response, queue) + return d + + def cbBody(self, body, response, queue): + queue.put_nowait( + (None, response.code, + {k.decode('ascii').title(): v[0].decode('ascii') + for k, v in response.headers.getAllRawHeaders()}, + body) + ) + + def cbError(self, failure, queue, timeoutCall): + if timeoutCall.active(): + timeoutCall.cancel() + queue.put_nowait(('Error: %s' % failure, + None, None, None)) + + def cancelTimeout(self, passthrough, timeoutCall): + if timeoutCall.active(): + timeoutCall.cancel() + return passthrough + + def main_thread(self): + """Main loop: create twisted agent and HTTP queries""" + + agent = Agent(reactor, connectTimeout=self.timeout) + + while True: + if self.queue.empty(): + pass + else: + request = self.queue.get_nowait() + if request is None: # Signal to stop + reactor.stop() + return + url, req_headers, use_proxy, queue = request + + try: + url.encode('ascii') + except UnicodeEncodeError: + url = encode_url(url) + req_headers = {k: [v] for k, v in req_headers.items()} + try: + d = agent.request(b'GET', url.encode('ascii'), + Headers(req_headers)) + except Exception as e: + queue.put_nowait(('Error: %s' % e, + None, None, None)) + continue + + # Setup timeout watch + if url.startswith('ftp://'): + timeout = self.ftp_timeout + else: + timeout = self.timeout + timeoutCall = reactor.callLater(timeout, d.cancel) + d.addBoth(self.cancelTimeout, timeoutCall) + + d.addCallback(self.cbRequest, queue, timeoutCall) + d.addErrback(self.cbError, queue, timeoutCall) + + sleep(0.1) + + def get_ftp_welcome(self): + return '' # We don't store welcome message yet diff --git a/bkmk_db-venv b/bkmk_db-venv index fc99182..b0d8561 100644 --- a/bkmk_db-venv +++ b/bkmk_db-venv @@ -10,6 +10,7 @@ if [ -z "$VIRTUAL_ENV" ]; then pip install --compile --upgrade setuptools \ beautifulsoup4 lxml m_lib.full \ "requests[socks]" pycurl certifi \ - aiohttp aiohttp-socks "aioftp[socks]" + aiohttp aiohttp-socks "aioftp[socks]" \ + twisted pyOpenSSL service-identity } fi diff --git a/doc/ANNOUNCE b/doc/ANNOUNCE index ffa3048..f2483d1 100644 --- a/doc/ANNOUNCE +++ b/doc/ANNOUNCE @@ -7,30 +7,12 @@ bookmarks.html. WHAT'S NEW -Version 6.1.0 (2024-09-08) +Version 6.2.0 (2024-??-??) - Combined aiohttp with multiaio; the combined robot is named just aio. + Robot based on twisted and concurrent.futures, + processes multiple URLs in parallel (multithreaded). - Robot based on curl_multi, processes multiple URLs in parallel - using concurrent.futures (multithreaded). Doesn't work good -- - slow and a number of problems; need more work. - - Combined curl with curlmulti; the combined robot is named just curl. - - Default list of robots is now multirequests,aio. - - Make bkmk_rmultirequests always multiprocess. - -Version 6.0.0 (2024-08-19) - - Robot based on aiohttp, processes multiple URLs in parallel. - - Default list of robots is now multirequests,multiaio,curl. - - Make all robots async. - Split check_bookmark() into sync and async variants. - - Renamed max_workers to max_urls. + Default list of robots is now multirequests,aio,twisted. WHERE TO GET diff --git a/doc/ChangeLog b/doc/ChangeLog index 662c110..752349c 100644 --- a/doc/ChangeLog +++ b/doc/ChangeLog @@ -1,3 +1,10 @@ +Version 6.2.0 (2024-??-??) + + Robot based on twisted and concurrent.futures, + processes multiple URLs in parallel (multithreaded). + + Default list of robots is now multirequests,aio,twisted. + Version 6.1.0 (2024-09-08) Combine aiohttp with multiaio; the combined robot is named just aio. diff --git a/robots.py b/robots.py index 2b80404..24bed78 100644 --- a/robots.py +++ b/robots.py @@ -15,7 +15,7 @@ from os import environ from bkmk_objects import parse_params, set_params robot_names, robot_params = parse_params( - environ.get("BKMK_ROBOT", "multirequests,aio")) + environ.get("BKMK_ROBOT", "multirequests,aio,twisted")) def import_robot(robot_name): diff --git a/setup.py b/setup.py index a140ded..14bf8e3 100755 --- a/setup.py +++ b/setup.py @@ -39,5 +39,6 @@ setup( 'requests': ['requests[socks]'], 'curl': ['pycurl', 'certifi'], 'aiohttp': ['aiohttp>=3', 'aiohttp-socks', 'aioftp[socks]'], + 'twisted': ['twisted', 'pyOpenSSL', 'service-identity'], }, ) -- 2.39.5