From 2356ed56395aeeac850eb3abb9c7bf947a759e2c Mon Sep 17 00:00:00 2001 From: Oleg Broytman Date: Sat, 7 Sep 2024 17:28:54 +0300 Subject: [PATCH] Feat(Robots): Robot based on curl_multi Processes multiple URLs in parallel using concurrent.futures (multithreaded). --- Robots/bkmk_rcurlmulti.py | 105 ++++++++++++++++++++++++++++++++++++++ bkmk_db-venv | 3 +- doc/ANNOUNCE | 5 ++ doc/ChangeLog | 5 ++ robots.py | 2 +- 5 files changed, 117 insertions(+), 3 deletions(-) create mode 100644 Robots/bkmk_rcurlmulti.py diff --git a/Robots/bkmk_rcurlmulti.py b/Robots/bkmk_rcurlmulti.py new file mode 100644 index 0000000..1d4f053 --- /dev/null +++ b/Robots/bkmk_rcurlmulti.py @@ -0,0 +1,105 @@ +"""Robot based on curl_multi and concurrent.futures, +processes multiple URLs in parallel (multithreaded). + +This file is a part of Bookmarks database and Internet robot. + +""" + +__author__ = "Oleg Broytman " +__copyright__ = "Copyright (C) 2024 PhiloSoft Design" +__license__ = "GNU GPL" + +__all__ = ['robot_curlmulti'] + + +from queue import Queue, Empty as QueueEmpty +from time import sleep +import concurrent.futures +import threading + +import pycurl + +from Robots.base import robot_base +from Robots.concurrent_futures import concurrent_futures +from Robots.bkmk_rcurl import CurlWrapper + + +class robot_curlmulti(concurrent_futures): + def __init__(self, *args, **kw): + self.concurrent_class = concurrent.futures.ThreadPoolExecutor + concurrent_futures.__init__(self, *args, **kw) + # + # Rename self.log, create one log_lines list per URL (thread) + self.file_log = self.log + del self.log + self.local = threading.local() + + # Queue to pass requests to the main loop + self.queue = Queue() + + # Main loop: run curl_multi + self.executor.submit(self.curl_multi) + + def __getattr__(self, attr): + if attr != 'log': + raise AttributeError(attr) + try: + return self.local.log.append + except AttributeError: + return self.file_log + + def check_bkmk_task(self): + return self.check_bkmk_thread + + def check_bkmk_thread(self, bookmark): + self.local.log = log = [] + robot_base.check_bookmark(self, bookmark) + return bookmark, log + + async def get(self, url, req_headers, use_proxy=False): + # Queue to wait results + queue = Queue(maxsize=1) + self.queue.put((url, req_headers, use_proxy, queue)) + return queue.get() + + def curl_multi(self): + _curl_multi = pycurl.CurlMulti() + _curl_multi.wrappers = {} + + while True: + if self.queue.empty(): + pass + else: + request = self.queue.get_nowait() + if request is None: # Signal to stop + return + url, req_headers, use_proxy, queue = request + cw = CurlWrapper(self, url, req_headers, use_proxy) + _curl_multi.wrappers[cw.curl] = (cw, queue) + _curl_multi.add_handle(cw.curl) + + _curl_multi.select(1.0) + while True: + ret, num_handles = _curl_multi.perform() + _, completed, failed = _curl_multi.info_read() + for c in completed: + _curl_multi.remove_handle(c) + cw, queue = _curl_multi.wrappers.pop(c) + queue.put_nowait( + (None, cw.getinfo(pycurl.HTTP_CODE), + cw.resp_headers, cw.body) + ) + for c, errcode, errmsg in failed: + _curl_multi.remove_handle(c) + cw, queue = _curl_multi.wrappers.pop(c) + queue.put_nowait(('Error: %d %s' % (errcode, errmsg), + None, None, None)) + if ret != pycurl.E_CALL_MULTI_PERFORM: + break + if num_handles == 0: + sleep(1) + + def wait(self): + super(robot_curlmulti, self).wait() + if not self.bookmarks and not self.pending: + self.queue.put(None) # Signal to stop diff --git a/bkmk_db-venv b/bkmk_db-venv index 36bd5e4..fc99182 100644 --- a/bkmk_db-venv +++ b/bkmk_db-venv @@ -9,8 +9,7 @@ if [ -z "$VIRTUAL_ENV" ]; then . bkmk_db-venv/bin/activate && pip install --compile --upgrade setuptools \ beautifulsoup4 lxml m_lib.full \ - "requests[socks]" \ - pycurl certifi \ + "requests[socks]" pycurl certifi \ aiohttp aiohttp-socks "aioftp[socks]" } fi diff --git a/doc/ANNOUNCE b/doc/ANNOUNCE index 14fda05..49a1e52 100644 --- a/doc/ANNOUNCE +++ b/doc/ANNOUNCE @@ -11,6 +11,11 @@ Version 6.1.0 (2024-??-??) Combine aiohttp with multiaio; the combined robot is named just aio. + Robot based on curl_multi, processes multiple URLs in parallel + using concurrent.futures (multithreaded). + + Default list of robots is now multirequests,curlmulti,aio. + Make bkmk_rmultirequests always multiprocess. Version 6.0.0 (2024-08-19) diff --git a/doc/ChangeLog b/doc/ChangeLog index ad7fe9c..36c1968 100644 --- a/doc/ChangeLog +++ b/doc/ChangeLog @@ -2,6 +2,11 @@ Version 6.1.0 (2024-??-??) Combine aiohttp with multiaio; the combined robot is named just aio. + Robot based on curl_multi, processes multiple URLs in parallel + using concurrent.futures (multithreaded). + + Default list of robots is now multirequests,curlmulti,aio. + Make bkmk_rmultirequests always multiprocess. Version 6.0.0 (2024-08-19) diff --git a/robots.py b/robots.py index 0bea1d4..23fce12 100644 --- a/robots.py +++ b/robots.py @@ -16,7 +16,7 @@ from os import environ from bkmk_objects import parse_params, set_params robot_names, robot_params = parse_params( - environ.get("BKMK_ROBOT", "multirequests,multiaio,curl")) + environ.get("BKMK_ROBOT", "multirequests,curlmulti,aio")) def import_robot(robot_name): -- 2.39.5