]> git.phdru.name Git - bookmarks_db.git/commitdiff
Feat(Robots): Robot based on curl_multi
authorOleg Broytman <phd@phdru.name>
Sat, 7 Sep 2024 14:28:54 +0000 (17:28 +0300)
committerOleg Broytman <phd@phdru.name>
Sun, 8 Sep 2024 07:30:40 +0000 (10:30 +0300)
Processes multiple URLs in parallel using concurrent.futures (multithreaded).

Robots/bkmk_rcurlmulti.py [new file with mode: 0644]
bkmk_db-venv
doc/ANNOUNCE
doc/ChangeLog
robots.py

diff --git a/Robots/bkmk_rcurlmulti.py b/Robots/bkmk_rcurlmulti.py
new file mode 100644 (file)
index 0000000..1d4f053
--- /dev/null
@@ -0,0 +1,105 @@
+"""Robot based on curl_multi and concurrent.futures,
+processes multiple URLs in parallel (multithreaded).
+
+This file is a part of Bookmarks database and Internet robot.
+
+"""
+
+__author__ = "Oleg Broytman <phd@phdru.name>"
+__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
+__license__ = "GNU GPL"
+
+__all__ = ['robot_curlmulti']
+
+
+from queue import Queue, Empty as QueueEmpty
+from time import sleep
+import concurrent.futures
+import threading
+
+import pycurl
+
+from Robots.base import robot_base
+from Robots.concurrent_futures import concurrent_futures
+from Robots.bkmk_rcurl import CurlWrapper
+
+
+class robot_curlmulti(concurrent_futures):
+    def __init__(self, *args, **kw):
+        self.concurrent_class = concurrent.futures.ThreadPoolExecutor
+        concurrent_futures.__init__(self, *args, **kw)
+        #
+        # Rename self.log, create one log_lines list per URL (thread)
+        self.file_log = self.log
+        del self.log
+        self.local = threading.local()
+
+        # Queue to pass requests to the main loop
+        self.queue = Queue()
+
+        # Main loop: run curl_multi
+        self.executor.submit(self.curl_multi)
+
+    def __getattr__(self, attr):
+        if attr != 'log':
+            raise AttributeError(attr)
+        try:
+            return self.local.log.append
+        except AttributeError:
+            return self.file_log
+
+    def check_bkmk_task(self):
+        return self.check_bkmk_thread
+
+    def check_bkmk_thread(self, bookmark):
+        self.local.log = log = []
+        robot_base.check_bookmark(self, bookmark)
+        return bookmark, log
+
+    async def get(self, url, req_headers, use_proxy=False):
+        # Queue to wait results
+        queue = Queue(maxsize=1)
+        self.queue.put((url, req_headers, use_proxy, queue))
+        return queue.get()
+
+    def curl_multi(self):
+        _curl_multi = pycurl.CurlMulti()
+        _curl_multi.wrappers = {}
+
+        while True:
+            if self.queue.empty():
+                pass
+            else:
+                request = self.queue.get_nowait()
+                if request is None:  # Signal to stop
+                    return
+                url, req_headers, use_proxy, queue = request
+                cw = CurlWrapper(self, url, req_headers, use_proxy)
+                _curl_multi.wrappers[cw.curl] = (cw, queue)
+                _curl_multi.add_handle(cw.curl)
+
+            _curl_multi.select(1.0)
+            while True:
+                ret, num_handles = _curl_multi.perform()
+                _, completed, failed = _curl_multi.info_read()
+                for c in completed:
+                    _curl_multi.remove_handle(c)
+                    cw, queue = _curl_multi.wrappers.pop(c)
+                    queue.put_nowait(
+                        (None, cw.getinfo(pycurl.HTTP_CODE),
+                         cw.resp_headers, cw.body)
+                    )
+                for c, errcode, errmsg in failed:
+                    _curl_multi.remove_handle(c)
+                    cw, queue = _curl_multi.wrappers.pop(c)
+                    queue.put_nowait(('Error: %d %s' % (errcode, errmsg),
+                                      None, None, None))
+                if ret != pycurl.E_CALL_MULTI_PERFORM:
+                    break
+            if num_handles == 0:
+                sleep(1)
+
+    def wait(self):
+        super(robot_curlmulti, self).wait()
+        if not self.bookmarks and not self.pending:
+            self.queue.put(None)  # Signal to stop
index 36bd5e4bc658f51b953ffa350be8f42462cbeaac..fc991828c9ea214426781c65e6c1c61b8b1df5d1 100644 (file)
@@ -9,8 +9,7 @@ if [ -z "$VIRTUAL_ENV" ]; then
          . bkmk_db-venv/bin/activate &&
          pip install --compile --upgrade setuptools \
          beautifulsoup4 lxml m_lib.full \
-         "requests[socks]" \
-         pycurl certifi \
+         "requests[socks]" pycurl certifi \
          aiohttp aiohttp-socks "aioftp[socks]"
     }
 fi
index 14fda05c99174dbef2610053a10d50ee9740d0f5..49a1e525db646850710487835c98c5bce8134fb6 100644 (file)
@@ -11,6 +11,11 @@ Version 6.1.0 (2024-??-??)
 
    Combine aiohttp with multiaio; the combined robot is named just aio.
 
+   Robot based on curl_multi, processes multiple URLs in parallel
+   using concurrent.futures (multithreaded).
+
+   Default list of robots is now multirequests,curlmulti,aio.
+
    Make bkmk_rmultirequests always multiprocess.
 
 Version 6.0.0 (2024-08-19)
index ad7fe9ce79cbf9cdd46b5097dcae9168c4f32ce6..36c1968c6e3db59c5c2c7a019864c232a6c95b1c 100644 (file)
@@ -2,6 +2,11 @@ Version 6.1.0 (2024-??-??)
 
    Combine aiohttp with multiaio; the combined robot is named just aio.
 
+   Robot based on curl_multi, processes multiple URLs in parallel
+   using concurrent.futures (multithreaded).
+
+   Default list of robots is now multirequests,curlmulti,aio.
+
    Make bkmk_rmultirequests always multiprocess.
 
 Version 6.0.0 (2024-08-19)
index 0bea1d462ba700dc4fe87b07162d23af0bd8cca1..23fce127229d12bd26dc73846c998206795361d2 100644 (file)
--- a/robots.py
+++ b/robots.py
@@ -16,7 +16,7 @@ from os import environ
 from bkmk_objects import parse_params, set_params
 
 robot_names, robot_params = parse_params(
-    environ.get("BKMK_ROBOT", "multirequests,multiaio,curl"))
+    environ.get("BKMK_ROBOT", "multirequests,curlmulti,aio"))
 
 
 def import_robot(robot_name):