]> git.phdru.name Git - bookmarks_db.git/commitdiff
Feat(Robots): Robot based on requests and concurrent.futures 5.6.0
authorOleg Broytman <phd@phdru.name>
Thu, 8 Aug 2024 13:25:33 +0000 (16:25 +0300)
committerOleg Broytman <phd@phdru.name>
Thu, 15 Aug 2024 17:44:48 +0000 (20:44 +0300)
Processes multiple URLs in parallel.

Robots/bkmk_rmultirequests.py [new file with mode: 0644]
doc/ANNOUNCE
doc/ChangeLog
robots.py

diff --git a/Robots/bkmk_rmultirequests.py b/Robots/bkmk_rmultirequests.py
new file mode 100644 (file)
index 0000000..b31c57a
--- /dev/null
@@ -0,0 +1,106 @@
+"""Robot based on requests and concurrent.futures,
+processes multiple URLs in parallel.
+
+This file is a part of Bookmarks database and Internet robot.
+
+"""
+
+__author__ = "Oleg Broytman <phd@phdru.name>"
+__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
+__license__ = "GNU GPL"
+
+__all__ = ['robot_multirequests']
+
+
+import concurrent.futures
+import os
+
+from bkmk_objects import copy_bkmk
+from Robots.bkmk_rrequests import robot_requests
+from robots import import_robot, set_params, robot_params
+
+cpu_count = os.cpu_count()
+
+
+class robot_multirequests(robot_requests):
+    concurrent_class = concurrent.futures.ProcessPoolExecutor  # or ThreadPoolExecutor # noqa: E501 line too long
+    # We're I/O-bound, not CPU-bound
+    max_workers = 2*cpu_count if cpu_count else 10
+
+    def __init__(self, *args, **kw):
+        if isinstance(self.max_workers, str):
+            self.max_workers = int(self.max_workers)
+        concurrent_class = getattr(concurrent.futures, self.concurrent_class) \
+            if isinstance(self.concurrent_class, str) \
+            else self.concurrent_class
+        self.concurrent_class_name = concurrent_class.__name__
+        robot_requests.__init__(self, *args, **kw)
+        self.executor = concurrent_class(max_workers=self.max_workers)
+
+        # Bookmarks waiting to be processed;
+        # maps {URL: [bookmark, saved parent, future]}
+        self.bookmarks = {}
+        self.pending = set()  # pending futures
+
+    def version_str(self):
+        return super(robot_multirequests, self).version_str() \
+            + ' concurrent.futures.' + self.concurrent_class_name
+
+    def check_bookmark(self, bookmark):
+        href = bookmark.href
+        bookmarks = self.bookmarks
+        if href in bookmarks:
+            return
+        bookmarks[href] = [bookmark, None, None]
+        if len(bookmarks) < self.max_workers:
+            return
+        self.wait()
+
+    def wait(self):
+        log = self.log
+        bookmarks = self.bookmarks
+        pending = self.pending
+
+        process = self.max_workers - len(pending)
+        if bookmarks and (process > 0):  # there's job and free workers
+            for href in bookmarks:
+                bookmark, parent, ft = bookmarks[href]
+                if ft is not None:  # it's already pending
+                    continue
+                parent = bookmark.parent
+                del bookmark.parent  # Prevent pickling the entire tree
+                ft = self.executor.submit(
+                    worker_check_bookmark, bookmark)
+                bookmarks[href] = [bookmark, parent, ft]
+                pending.add(ft)
+
+        if pending:
+            done, pending = concurrent.futures.wait(
+                pending, self.timeout+1,
+                return_when=concurrent.futures.FIRST_COMPLETED)
+
+        for ft in done:
+            new_bkmk, log_lines = ft.result()
+            bookmark, parent, old_ft = bookmarks.pop(new_bkmk.href)
+            assert old_ft is ft
+            if new_bkmk is not bookmark:  # unpickled from a subprocess
+                copy_bkmk(new_bkmk, bookmark)
+            bookmark.parent = parent
+            log('Checked: %s' % bookmark.href)
+            for line in log_lines:
+                log(line)
+
+        self.pending = pending
+
+    def stop(self):
+        while self.bookmarks or self.pending:
+            self.wait()
+        self.executor.shutdown(wait=True)
+
+
+def worker_check_bookmark(bookmark):
+    log_lines = []
+    robot = import_robot('requests')
+    set_params(robot, robot_params)
+    robot(log_lines.append).check_bookmark(bookmark)
+    return bookmark, log_lines
index 7161011499603f46d8ebe6989f8294efcfb2f18a..9f1157811dbdb3c9e07d8be471828457b7bf0151 100644 (file)
@@ -7,7 +7,11 @@ bookmarks.html.
 
 WHAT'S NEW
 
-Version 5.6.0 (2024-??-??)
+Version 5.6.0 (2024-08-15)
+
+   Robot based on requests and concurrent.futures,
+   processes multiple URLs in parallel. Multiprocess variant works
+   very well, multithreading not so good (too many sites report errors).
 
    Removed urllib-based robots.
 
index 38ecb852bd4f9a59423639e8002b7f112d265103..a1748f063d986228c4dc19fe83714adeda7f0b8a 100644 (file)
@@ -1,11 +1,15 @@
-Version 5.6.0 (2024-??-??)
+Version 5.6.0 (2024-08-15)
+
+   Robot based on requests and concurrent.futures,
+   processes multiple URLs in parallel. Multiprocess variant works
+   very well, multithreading not so good (too many sites report errors).
+
+   Default list of robots is now multirequests,curl,requests,aiohttp.
 
    Removed urllib-based robots.
 
    Dropped support for Python 2.
 
-   Default list of robots is now curl,requests,aiohttp.
-
 Version 5.5.1 (2024-08-??)
 
    Use aioftp in aiohttp robot.
index 3a9491db8a7f08c90616fe994d47e3fa46458191..583d1a8819d62c747f5464e8ed0f7ffc347c78b2 100644 (file)
--- a/robots.py
+++ b/robots.py
@@ -16,7 +16,7 @@ from os import environ
 from bkmk_objects import parse_params, set_params
 
 robot_names, robot_params = parse_params(
-    environ.get("BKMK_ROBOT", "curl,requests,aiohttp"))
+    environ.get("BKMK_ROBOT", "multirequests,curl,requests,aiohttp"))
 
 
 def import_robot(robot_name):