From 065b7db9a8ff01eca49e4dcdcdccc98981f78a61 Mon Sep 17 00:00:00 2001 From: Oleg Broytman Date: Sun, 8 Sep 2024 10:36:12 +0300 Subject: [PATCH] Feat(Robots): Combined curl with curlmulti The combined robot is named just curl. --- Robots/bkmk_rcurl.py | 96 +++++++++++++++++++++++++++++----- Robots/bkmk_rcurlmulti.py | 105 -------------------------------------- doc/ANNOUNCE | 6 ++- doc/ChangeLog | 4 +- robots.py | 2 +- 5 files changed, 91 insertions(+), 122 deletions(-) delete mode 100644 Robots/bkmk_rcurlmulti.py diff --git a/Robots/bkmk_rcurl.py b/Robots/bkmk_rcurl.py index 73bfbe3..36c715b 100644 --- a/Robots/bkmk_rcurl.py +++ b/Robots/bkmk_rcurl.py @@ -1,4 +1,5 @@ -"""Robot based on PycURL +"""Robot based on curl_multi and concurrent.futures, +processes multiple URLs in parallel (multithreaded). This file is a part of Bookmarks database and Internet robot. @@ -11,32 +12,101 @@ __license__ = "GNU GPL" __all__ = ['robot_curl'] +from queue import Queue +from time import sleep from urllib.parse import urlsplit, parse_qsl, quote, quote_plus, urlencode +import concurrent.futures +import threading import certifi import pycurl from Robots.base import robot_base +from Robots.concurrent_futures import concurrent_futures -class robot_curl(robot_base): +class robot_curl(concurrent_futures): + def __init__(self, *args, **kw): + self.concurrent_class = concurrent.futures.ThreadPoolExecutor + concurrent_futures.__init__(self, *args, **kw) + # + # Rename self.log, create one log_lines list per URL (thread) + self.file_log = self.log + del self.log + self.local = threading.local() + + # Queue to pass requests to the main loop + self.queue = Queue() + + # Main loop: run curl_multi + self.executor.submit(self.curl_multi) + + def __getattr__(self, attr): + if attr != 'log': + raise AttributeError(attr) + try: + return self.local.log.append + except AttributeError: + return self.file_log + def version_str(self): return str(pycurl.version) + def check_bkmk_task(self): + return self.check_bkmk_thread + + def check_bkmk_thread(self, bookmark): + self.local.log = log = [] + robot_base.check_bookmark(self, bookmark) + return bookmark, log + async def get(self, url, req_headers, use_proxy=False): - curlw = CurlWrapper(self, url, req_headers, use_proxy) + # Queue to wait results + queue = Queue(maxsize=1) + self.queue.put((url, req_headers, use_proxy, queue)) + return queue.get() - try: - curlw.perform() - except pycurl.error as e: - error = str(e) - status = None - else: - error = None - status = curlw.getinfo(pycurl.HTTP_CODE) - curlw.close() + def curl_multi(self): + _curl_multi = pycurl.CurlMulti() + _curl_multi.wrappers = {} - return error, status, curlw.resp_headers, curlw.body + while True: + if self.queue.empty(): + pass + else: + request = self.queue.get_nowait() + if request is None: # Signal to stop + return + url, req_headers, use_proxy, queue = request + cw = CurlWrapper(self, url, req_headers, use_proxy) + _curl_multi.wrappers[cw.curl] = (cw, queue) + _curl_multi.add_handle(cw.curl) + + _curl_multi.select(1.0) + while True: + ret, num_handles = _curl_multi.perform() + _, completed, failed = _curl_multi.info_read() + for c in completed: + _curl_multi.remove_handle(c) + cw, queue = _curl_multi.wrappers.pop(c) + queue.put_nowait( + (None, cw.getinfo(pycurl.HTTP_CODE), + cw.resp_headers, cw.body) + ) + for c, errcode, errmsg in failed: + _curl_multi.remove_handle(c) + cw, queue = _curl_multi.wrappers.pop(c) + queue.put_nowait(('Error: %d %s' % (errcode, errmsg), + None, None, None)) + if ret != pycurl.E_CALL_MULTI_PERFORM: + break + if num_handles == 0: + sleep(1) + + def wait(self): + super(robot_curl, self).wait() + if not self.bookmarks and not self.pending: + self.queue.put(None) # Signal to stop def get_ftp_welcome(self): return '' # We don't store welcome message yet diff --git a/Robots/bkmk_rcurlmulti.py b/Robots/bkmk_rcurlmulti.py deleted file mode 100644 index 1d4f053..0000000 --- a/Robots/bkmk_rcurlmulti.py +++ /dev/null @@ -1,105 +0,0 @@ -"""Robot based on curl_multi and concurrent.futures, -processes multiple URLs in parallel (multithreaded). - -This file is a part of Bookmarks database and Internet robot. - -""" - -__author__ = "Oleg Broytman " -__copyright__ = "Copyright (C) 2024 PhiloSoft Design" -__license__ = "GNU GPL" - -__all__ = ['robot_curlmulti'] - - -from queue import Queue, Empty as QueueEmpty -from time import sleep -import concurrent.futures -import threading - -import pycurl - -from Robots.base import robot_base -from Robots.concurrent_futures import concurrent_futures -from Robots.bkmk_rcurl import CurlWrapper - - -class robot_curlmulti(concurrent_futures): - def __init__(self, *args, **kw): - self.concurrent_class = concurrent.futures.ThreadPoolExecutor - concurrent_futures.__init__(self, *args, **kw) - # - # Rename self.log, create one log_lines list per URL (thread) - self.file_log = self.log - del self.log - self.local = threading.local() - - # Queue to pass requests to the main loop - self.queue = Queue() - - # Main loop: run curl_multi - self.executor.submit(self.curl_multi) - - def __getattr__(self, attr): - if attr != 'log': - raise AttributeError(attr) - try: - return self.local.log.append - except AttributeError: - return self.file_log - - def check_bkmk_task(self): - return self.check_bkmk_thread - - def check_bkmk_thread(self, bookmark): - self.local.log = log = [] - robot_base.check_bookmark(self, bookmark) - return bookmark, log - - async def get(self, url, req_headers, use_proxy=False): - # Queue to wait results - queue = Queue(maxsize=1) - self.queue.put((url, req_headers, use_proxy, queue)) - return queue.get() - - def curl_multi(self): - _curl_multi = pycurl.CurlMulti() - _curl_multi.wrappers = {} - - while True: - if self.queue.empty(): - pass - else: - request = self.queue.get_nowait() - if request is None: # Signal to stop - return - url, req_headers, use_proxy, queue = request - cw = CurlWrapper(self, url, req_headers, use_proxy) - _curl_multi.wrappers[cw.curl] = (cw, queue) - _curl_multi.add_handle(cw.curl) - - _curl_multi.select(1.0) - while True: - ret, num_handles = _curl_multi.perform() - _, completed, failed = _curl_multi.info_read() - for c in completed: - _curl_multi.remove_handle(c) - cw, queue = _curl_multi.wrappers.pop(c) - queue.put_nowait( - (None, cw.getinfo(pycurl.HTTP_CODE), - cw.resp_headers, cw.body) - ) - for c, errcode, errmsg in failed: - _curl_multi.remove_handle(c) - cw, queue = _curl_multi.wrappers.pop(c) - queue.put_nowait(('Error: %d %s' % (errcode, errmsg), - None, None, None)) - if ret != pycurl.E_CALL_MULTI_PERFORM: - break - if num_handles == 0: - sleep(1) - - def wait(self): - super(robot_curlmulti, self).wait() - if not self.bookmarks and not self.pending: - self.queue.put(None) # Signal to stop diff --git a/doc/ANNOUNCE b/doc/ANNOUNCE index 49a1e52..3962e5a 100644 --- a/doc/ANNOUNCE +++ b/doc/ANNOUNCE @@ -9,12 +9,14 @@ WHAT'S NEW Version 6.1.0 (2024-??-??) - Combine aiohttp with multiaio; the combined robot is named just aio. + Combined aiohttp with multiaio; the combined robot is named just aio. Robot based on curl_multi, processes multiple URLs in parallel using concurrent.futures (multithreaded). - Default list of robots is now multirequests,curlmulti,aio. + Combined curl with curlmulti; the combined robot is named just curl. + + Default list of robots is now multirequests,aio,curl. Make bkmk_rmultirequests always multiprocess. diff --git a/doc/ChangeLog b/doc/ChangeLog index 36c1968..54fa802 100644 --- a/doc/ChangeLog +++ b/doc/ChangeLog @@ -5,7 +5,9 @@ Version 6.1.0 (2024-??-??) Robot based on curl_multi, processes multiple URLs in parallel using concurrent.futures (multithreaded). - Default list of robots is now multirequests,curlmulti,aio. + Combined curl with curlmulti; the combined robot is named just curl. + + Default list of robots is now multirequests,aio,curl. Make bkmk_rmultirequests always multiprocess. diff --git a/robots.py b/robots.py index 23fce12..7b79976 100644 --- a/robots.py +++ b/robots.py @@ -16,7 +16,7 @@ from os import environ from bkmk_objects import parse_params, set_params robot_names, robot_params = parse_params( - environ.get("BKMK_ROBOT", "multirequests,curlmulti,aio")) + environ.get("BKMK_ROBOT", "multirequests,aio,curl")) def import_robot(robot_name): -- 2.39.5