From 26fa66e8c8f58e0c9b9e189c46698fb32022a54a Mon Sep 17 00:00:00 2001 From: Oleg Broytman Date: Tue, 10 Sep 2024 18:01:21 +0300 Subject: [PATCH] Feat(Robots): Remove slow or problematic robots curl, multiaio, twisted. --- Robots/bkmk_rcurl.py | 124 ----------------------------------- Robots/bkmk_rmultiaio.py | 78 ---------------------- Robots/bkmk_rtwisted.py | 111 ------------------------------- Robots/concurrent_futures.py | 44 ------------- bkmk_db-venv | 5 +- doc/ANNOUNCE | 12 ---- doc/ChangeLog | 12 ---- setup.py | 2 - 8 files changed, 2 insertions(+), 386 deletions(-) delete mode 100644 Robots/bkmk_rcurl.py delete mode 100644 Robots/bkmk_rmultiaio.py delete mode 100644 Robots/bkmk_rtwisted.py diff --git a/Robots/bkmk_rcurl.py b/Robots/bkmk_rcurl.py deleted file mode 100644 index 7bd0907..0000000 --- a/Robots/bkmk_rcurl.py +++ /dev/null @@ -1,124 +0,0 @@ -"""Robot based on curl_multi and concurrent.futures, -processes multiple URLs in parallel (multithreaded). - -This file is a part of Bookmarks database and Internet robot. - -""" - -__author__ = "Oleg Broytman " -__copyright__ = "Copyright (C) 2024 PhiloSoft Design" -__license__ = "GNU GPL" - -__all__ = ['robot_curl'] - - -from time import sleep - -import certifi -import pycurl - -from Robots.base import encode_url -from Robots.concurrent_futures import cf_multithread - - -class robot_curl(cf_multithread): - def version_str(self): - return super(cf_multithread, self).version_str() \ - + '; ' + str(pycurl.version) - - def main_thread(self): - """Main loop: run curl_multi""" - - _curl_multi = pycurl.CurlMulti() - _curl_multi.wrappers = {} - - while True: - if self.queue.empty(): - pass - else: - request = self.queue.get_nowait() - if request is None: # Signal to stop - return - url, req_headers, use_proxy, queue = request - cw = CurlWrapper(self, url, req_headers, use_proxy) - _curl_multi.wrappers[cw.curl] = (cw, queue) - _curl_multi.add_handle(cw.curl) - - _curl_multi.select(1.0) - while True: - ret, num_handles = _curl_multi.perform() - _, completed, failed = _curl_multi.info_read() - for c in completed: - _curl_multi.remove_handle(c) - cw, queue = _curl_multi.wrappers.pop(c) - queue.put_nowait( - (None, cw.getinfo(pycurl.HTTP_CODE), - cw.resp_headers, cw.body) - ) - for c, errcode, errmsg in failed: - _curl_multi.remove_handle(c) - cw, queue = _curl_multi.wrappers.pop(c) - queue.put_nowait(('Error: %d %s' % (errcode, errmsg), - None, None, None)) - if ret != pycurl.E_CALL_MULTI_PERFORM: - break - if num_handles == 0: - sleep(1) - - -class CurlWrapper: - def __init__(self, robot, url, req_headers, use_proxy): - req_headers = ['%s: %s' % (k, v) for k, v in req_headers.items()] - - self.curl = curl = pycurl.Curl() - self.resp_headers = {} - self.body = b'' - - # Do not follow redirects - curl.setopt(pycurl.FOLLOWLOCATION, 0) - # Lower security settings - we need to get as musch as possible - curl.setopt(pycurl.SSL_CIPHER_LIST, 'ALL:@SECLEVEL=1') - curl.setopt(pycurl.SSL_VERIFYHOST, 0) - curl.setopt(pycurl.SSL_VERIFYPEER, 0) - curl.setopt(curl.CAINFO, certifi.where()) - # Set timeouts to avoid hanging too long - curl.setopt(pycurl.CONNECTTIMEOUT, robot.timeout) - curl.setopt(pycurl.TIMEOUT, robot.timeout) - # Parse Last-Modified - curl.setopt(pycurl.OPT_FILETIME, 1) - - if use_proxy: - curl.setopt(pycurl.PROXY, robot.proxy) - - # Set up a callback to capture the headers and the body - curl.setopt(pycurl.HEADERFUNCTION, self.header_callback) - curl.setopt(pycurl.WRITEFUNCTION, self.body_callback) - - curl.setopt(pycurl.HTTPGET, 1) - curl.setopt(pycurl.HTTPHEADER, req_headers) - try: - url.encode('ascii') - except UnicodeEncodeError: - url = encode_url(url) - curl.setopt(pycurl.URL, url) - - def __getattr__(self, attr): - return getattr(self.curl, attr) - - def header_callback(self, data): - for encoding in 'ascii', 'latin1', 'utf-8': - try: - data = data.decode(encoding) - except UnicodeDecodeError: - pass - else: - break - else: - print("Error decoding header:", data) - return - if ':' in data: - key, value = data.split(':', 1) - self.resp_headers[key.title()] = value.strip() - - def body_callback(self, data): - self.body += data diff --git a/Robots/bkmk_rmultiaio.py b/Robots/bkmk_rmultiaio.py deleted file mode 100644 index 78cdaa2..0000000 --- a/Robots/bkmk_rmultiaio.py +++ /dev/null @@ -1,78 +0,0 @@ -"""Robot based on aiohttp and concurrent.futures, -processes multiple URLs in parallel (multithreaded). - -This file is a part of Bookmarks database and Internet robot. - -""" - -__author__ = "Oleg Broytman " -__copyright__ = "Copyright (C) 2024 PhiloSoft Design" -__license__ = "GNU GPL" - -__all__ = ['robot_multiaio'] - - -import asyncio -import aiohttp - -from Robots.bkmk_raio import _get_http, _get_ftp -from Robots.concurrent_futures import cf_multithread - - -class robot_multiaio(cf_multithread): - def __init__(self, *args, **kw): - self.async_pending = set() # pending async tasks - cf_multithread.__init__(self, *args, **kw) - - def version_str(self): - return 'aiohttp/%s; multithreaded' % aiohttp.__version__ - - def main_thread(self): - asyncio.run(self.main_thread_async()) - - async def main_thread_async(self): - """Main loop""" - - while True: - if self.queue.empty(): - pass - else: - request = self.queue.get_nowait() - if request is None: # Signal to stop - return - url, req_headers, use_proxy, queue = request - - task = asyncio.create_task( - self.get_url_task(url, req_headers, use_proxy, queue)) - self.async_pending.add(task) - - if self.async_pending: - done, async_pending = await asyncio.wait( - self.async_pending, timeout=self.timeout, - return_when=asyncio.FIRST_COMPLETED) - self.async_pending = async_pending - - for task in done: - error, status, resp_headers, body, queue = task.result() - queue.put_nowait((error, status, resp_headers, body)) - - async def get_url_task(self, url, req_headers, use_proxy, queue): - if url.startswith('ftp://'): - error, body = await _get_ftp( - url, timeout=self.timeout, - ) - if error is not None: - error = str(error) - return error, None, None, None, queue - return None, None, None, body, queue - - if use_proxy: - proxy = self.proxy - else: - proxy = None - - error, status, resp_headers, body = await _get_http( - url, req_headers, proxy=proxy, - timeout=self.timeout, - ) - return error, status, resp_headers, body, queue diff --git a/Robots/bkmk_rtwisted.py b/Robots/bkmk_rtwisted.py deleted file mode 100644 index 47e09da..0000000 --- a/Robots/bkmk_rtwisted.py +++ /dev/null @@ -1,111 +0,0 @@ -"""Robot based on twisted and concurrent.futures, -processes multiple URLs in parallel (multithreaded). - -This file is a part of Bookmarks database and Internet robot. - -""" - -__author__ = "Oleg Broytman " -__copyright__ = "Copyright (C) 2024 PhiloSoft Design" -__license__ = "GNU GPL" - -__all__ = ['robot_twisted'] - - -from urllib.parse import urlsplit -from time import sleep - -from twisted import __version__ -from twisted.internet import reactor -from twisted.internet.endpoints import TCP4ClientEndpoint -from twisted.web.client import Agent, ProxyAgent, readBody -from twisted.web.http_headers import Headers - -from Robots.base import encode_url -from Robots.concurrent_futures import cf_multithread - -from twisted.internet import _sslverify -_sslverify.platformTrust = lambda: None - - -class robot_twisted(cf_multithread): - def __init__(self, *args, **kw): - cf_multithread.__init__(self, *args, **kw) - self.executor.submit(reactor.run, installSignalHandlers=False) - - def version_str(self): - return super(cf_multithread, self).version_str() \ - + '; Twisted ' + __version__ - - def cbRequest(self, response, queue, timeoutCall): - if timeoutCall.active(): - timeoutCall.cancel() - d = readBody(response) - d.addCallback(self.cbBody, response, queue) - return d - - def cbBody(self, body, response, queue): - queue.put_nowait( - (None, response.code, - {k.decode('ascii').title(): v[0].decode('ascii') - for k, v in response.headers.getAllRawHeaders()}, - body) - ) - - def cbError(self, failure, queue, timeoutCall): - if timeoutCall.active(): - timeoutCall.cancel() - queue.put_nowait(('Error: %s' % failure, - None, None, None)) - - def cancelTimeout(self, passthrough, timeoutCall): - if timeoutCall.active(): - timeoutCall.cancel() - return passthrough - - def main_thread(self): - """Main loop: create twisted agent and HTTP queries""" - - direct_agent = Agent(reactor, connectTimeout=self.timeout) - - if self.proxy and self.proxy.startswith('http'): - proxy = urlsplit(self.proxy) - endpoint = TCP4ClientEndpoint( - reactor, proxy.hostname, proxy.port, timeout=self.timeout) - proxy_agent = ProxyAgent(endpoint) - - while True: - if self.queue.empty(): - pass - else: - request = self.queue.get_nowait() - if request is None: # Signal to stop - reactor.stop() - return - url, req_headers, use_proxy, queue = request - - try: - url.encode('ascii') - except UnicodeEncodeError: - url = encode_url(url) - req_headers = {k: [v] for k, v in req_headers.items()} - if use_proxy: - agent = proxy_agent - else: - agent = direct_agent - try: - d = agent.request(b'GET', url.encode('ascii'), - Headers(req_headers)) - except Exception as e: - queue.put_nowait(('Error: %s' % e, - None, None, None)) - continue - - # Setup timeout watch - timeoutCall = reactor.callLater(self.timeout, d.cancel) - d.addBoth(self.cancelTimeout, timeoutCall) - - d.addCallback(self.cbRequest, queue, timeoutCall) - d.addErrback(self.cbError, queue, timeoutCall) - - sleep(0.1) diff --git a/Robots/concurrent_futures.py b/Robots/concurrent_futures.py index 5d1f8d0..aa46019 100644 --- a/Robots/concurrent_futures.py +++ b/Robots/concurrent_futures.py @@ -84,47 +84,3 @@ class cf_multiprocess(concurrent_futures): def __init__(self, *args, **kw): self.concurrent_class = concurrent.futures.ProcessPoolExecutor concurrent_futures.__init__(self, *args, **kw) - - -class cf_multithread(concurrent_futures): - def __init__(self, *args, **kw): - self.concurrent_class = concurrent.futures.ThreadPoolExecutor - concurrent_futures.__init__(self, *args, **kw) - - # Rename self.log, create one log_lines list per URL (thread) - self.file_log = self.log - del self.log - self.local = threading.local() - - # Queue to pass requests to the main loop - self.queue = Queue() - - # Main loop - self.executor.submit(self.main_thread) - - def __getattr__(self, attr): - if attr != 'log': - raise AttributeError(attr) - try: - return self.local.log.append - except AttributeError: - return self.file_log - - def check_bkmk_task(self): - return self.check_bkmk_thread - - def check_bkmk_thread(self, bookmark): - self.local.log = log = [] - robot_base.check_bookmark(self, bookmark) - return bookmark, log - - async def get(self, url, req_headers, use_proxy=False): - # Queue to wait results - queue = Queue(maxsize=1) - self.queue.put((url, req_headers, use_proxy, queue)) - return queue.get() - - def wait(self): - super(cf_multithread, self).wait() - if not self.bookmarks and not self.pending: - self.queue.put(None) # Signal to stop diff --git a/bkmk_db-venv b/bkmk_db-venv index b0d8561..6961f5d 100644 --- a/bkmk_db-venv +++ b/bkmk_db-venv @@ -9,8 +9,7 @@ if [ -z "$VIRTUAL_ENV" ]; then . bkmk_db-venv/bin/activate && pip install --compile --upgrade setuptools \ beautifulsoup4 lxml m_lib.full \ - "requests[socks]" pycurl certifi \ - aiohttp aiohttp-socks "aioftp[socks]" \ - twisted pyOpenSSL service-identity + "requests[socks]" \ + aiohttp aiohttp-socks "aioftp[socks]" } fi diff --git a/doc/ANNOUNCE b/doc/ANNOUNCE index 9bced52..3ff72c7 100644 --- a/doc/ANNOUNCE +++ b/doc/ANNOUNCE @@ -9,18 +9,6 @@ WHAT'S NEW Version 6.2.0 (2024-??-??) - Robot based on aiohttp and concurrent.futures, - processes multiple URLs in parallel (multithreaded). - Works slowly, the same way as curl. - - Default list of robots is multirequests,aio. - - Robot based on twisted and concurrent.futures, - processes multiple URLs in parallel (multithreaded). - Doesn't properly support proxies; has problems with HTTP proxy - and doesn't support SOCKS5 proxy at all. - Doesn't query FTP; requires more work. - Robots: Removed ftp_timeout. diff --git a/doc/ChangeLog b/doc/ChangeLog index e0e4589..1cafec6 100644 --- a/doc/ChangeLog +++ b/doc/ChangeLog @@ -1,17 +1,5 @@ Version 6.2.0 (2024-??-??) - Robot based on aiohttp and concurrent.futures, - processes multiple URLs in parallel (multithreaded). - Works slowly, the same way as curl. - - Default list of robots is multirequests,aio. - - Robot based on twisted and concurrent.futures, - processes multiple URLs in parallel (multithreaded). - Doesn't properly support proxies; has problems with HTTP proxy - and doesn't support SOCKS5 proxy at all. - Doesn't query FTP; requires more work. - Robots: Removed ftp_timeout. Version 6.1.0 (2024-09-08) diff --git a/setup.py b/setup.py index 14bf8e3..9bb3ef1 100755 --- a/setup.py +++ b/setup.py @@ -37,8 +37,6 @@ setup( extras_require={ 'html': ['beautifulsoup4', 'lxml'], 'requests': ['requests[socks]'], - 'curl': ['pycurl', 'certifi'], 'aiohttp': ['aiohttp>=3', 'aiohttp-socks', 'aioftp[socks]'], - 'twisted': ['twisted', 'pyOpenSSL', 'service-identity'], }, ) -- 2.39.5