]> git.phdru.name Git - bookmarks_db.git/commitdiff
Refactor(Robots): Split `bkmk_rmultirequests` into `concurrent_futures` mix-in
authorOleg Broytman <phd@phdru.name>
Thu, 5 Sep 2024 14:32:55 +0000 (17:32 +0300)
committerOleg Broytman <phd@phdru.name>
Sat, 7 Sep 2024 14:30:02 +0000 (17:30 +0300)
Robots/bkmk_rmultirequests.py
Robots/concurrent_futures.py [new file with mode: 0644]

index fc66f4e032994be75a20926fc83c26d53d77eeb1..39daf9c381fca93111cf78bb80356c8da9e6cd9b 100644 (file)
@@ -1,5 +1,5 @@
 """Robot based on requests and concurrent.futures,
-processes multiple URLs in parallel.
+processes multiple URLs in parallel (multiprocess).
 
 This file is a part of Bookmarks database and Internet robot.
 
@@ -13,77 +13,21 @@ __all__ = ['robot_multirequests']
 
 
 import concurrent.futures
-import os
 
-from bkmk_objects import copy_bkmk
-from Robots.base import robot_base
-from Robots.multi_mixin import multi_mixin
+from Robots.concurrent_futures import concurrent_futures
 from robots import import_robot, set_params, robot_params
 
 
-cpu_count = os.cpu_count()
-
-
-class robot_multirequests(multi_mixin, robot_base):
-    # We're I/O-bound, not CPU-bound
-    max_urls = 2*cpu_count if cpu_count else 10
-
+class robot_multirequests(concurrent_futures):
     def __init__(self, *args, **kw):
-        concurrent_class = concurrent.futures.ProcessPoolExecutor
-        multi_mixin.__init__(self, *args, **kw)
-        robot_base.__init__(self, *args, **kw)
-        self.concurrent_class_name = concurrent_class.__name__
-        self.executor = concurrent_class(max_workers=self.max_urls)
-
-    def version_str(self):
-        return super(robot_multirequests, self).version_str() \
-            + ' multi: concurrent.futures.' + self.concurrent_class_name
-
-    def wait(self):
-        log = self.log
-        bookmarks = self.bookmarks
-        pending = self.pending
-
-        free_workers = self.max_urls - len(pending)
-        if bookmarks and (free_workers > 0):  # we have job and free workers
-            for href in bookmarks:
-                bookmark, parent, ft = bookmarks[href]
-                if ft is not None:  # it's already pending
-                    continue
-                parent = bookmark.parent
-                del bookmark.parent  # Prevent pickling the entire tree
-                ft = self.executor.submit(
-                    worker_check_bookmark, bookmark)
-                bookmarks[href] = [bookmark, parent, ft]
-                pending.add(ft)
-
-                free_workers -= 1
-                if free_workers == 0:
-                    break
-
-        if pending:
-            done, pending = concurrent.futures.wait(
-                pending, self.timeout+1,
-                return_when=concurrent.futures.FIRST_COMPLETED)
-        self.pending = pending
-
-        for ft in done:
-            new_bkmk, log_lines = ft.result()
-            bookmark, parent, old_ft = bookmarks.pop(new_bkmk.href)
-            assert old_ft is ft
-            if new_bkmk is not bookmark:  # unpickled from a subprocess
-                copy_bkmk(new_bkmk, bookmark)
-            bookmark.parent = parent
-            log('Checked: %s' % bookmark.href)
-            for line in log_lines:
-                log(line)
+        self.concurrent_class = concurrent.futures.ProcessPoolExecutor
+        concurrent_futures.__init__(self, *args, **kw)
 
-    def stop(self):
-        super(robot_multirequests, self).stop()
-        self.executor.shutdown(wait=True)
+    def check_bkmk_task(self):
+        return check_bookmark_subproc
 
 
-def worker_check_bookmark(bookmark):
+def check_bookmark_subproc(bookmark):
     log_lines = []
     robot = import_robot('requests')
     set_params(robot, robot_params)
diff --git a/Robots/concurrent_futures.py b/Robots/concurrent_futures.py
new file mode 100644 (file)
index 0000000..7e4a165
--- /dev/null
@@ -0,0 +1,79 @@
+"""Mix-in for robots based on concurrent.futures, processes multiple URLs in parallel.
+
+This file is a part of Bookmarks database and Internet robot.
+
+"""
+
+__author__ = "Oleg Broytman <phd@phdru.name>"
+__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
+__license__ = "GNU GPL"
+
+__all__ = ['concurrent_futures']
+
+
+import concurrent.futures
+import os
+
+from bkmk_objects import copy_bkmk
+from Robots.base import robot_base
+from Robots.multi_mixin import multi_mixin
+
+
+cpu_count = os.cpu_count()
+
+
+class concurrent_futures(multi_mixin, robot_base):
+    # We're I/O-bound, not CPU-bound
+    max_urls = 2*cpu_count if cpu_count else 10
+
+    def __init__(self, *args, **kw):
+        self.concurrent_class_name = self.concurrent_class.__name__
+        multi_mixin.__init__(self, *args, **kw)
+        robot_base.__init__(self, *args, **kw)
+        self.executor = self.concurrent_class(max_workers=self.max_urls)
+
+    def version_str(self):
+        return super(concurrent_futures, self).version_str() \
+            + ' multi: concurrent.futures.' + self.concurrent_class_name
+
+    def wait(self):
+        log = self.log
+        bookmarks = self.bookmarks
+        pending = self.pending
+
+        free_workers = self.max_urls - len(pending)
+        if bookmarks and (free_workers > 0):  # we have job and free workers
+            for href in bookmarks:
+                bookmark, parent, ft = bookmarks[href]
+                if ft is not None:  # it's already pending
+                    continue
+                parent = bookmark.parent
+                del bookmark.parent  # Prevent pickling the entire tree
+                ft = self.executor.submit(self.check_bkmk_task(), bookmark)
+                bookmarks[href] = [bookmark, parent, ft]
+                pending.add(ft)
+
+                free_workers -= 1
+                if free_workers == 0:
+                    break
+
+        if pending:
+            done, pending = concurrent.futures.wait(
+                pending, self.timeout+1,
+                return_when=concurrent.futures.FIRST_COMPLETED)
+        self.pending = pending
+
+        for ft in done:
+            new_bkmk, log_lines = ft.result()
+            bookmark, parent, old_ft = bookmarks.pop(new_bkmk.href)
+            assert old_ft is ft
+            if new_bkmk is not bookmark:  # unpickled from a subprocess
+                copy_bkmk(new_bkmk, bookmark)
+            bookmark.parent = parent
+            log('Checked: %s' % bookmark.href)
+            for line in log_lines:
+                log(line)
+
+    def stop(self):
+        super(concurrent_futures, self).stop()
+        self.executor.shutdown(wait=True)