"""Robot based on requests and concurrent.futures,
-processes multiple URLs in parallel.
+processes multiple URLs in parallel (multiprocess).
This file is a part of Bookmarks database and Internet robot.
import concurrent.futures
-import os
-from bkmk_objects import copy_bkmk
-from Robots.base import robot_base
-from Robots.multi_mixin import multi_mixin
+from Robots.concurrent_futures import concurrent_futures
from robots import import_robot, set_params, robot_params
-cpu_count = os.cpu_count()
-
-
-class robot_multirequests(multi_mixin, robot_base):
- # We're I/O-bound, not CPU-bound
- max_urls = 2*cpu_count if cpu_count else 10
-
+class robot_multirequests(concurrent_futures):
def __init__(self, *args, **kw):
- concurrent_class = concurrent.futures.ProcessPoolExecutor
- multi_mixin.__init__(self, *args, **kw)
- robot_base.__init__(self, *args, **kw)
- self.concurrent_class_name = concurrent_class.__name__
- self.executor = concurrent_class(max_workers=self.max_urls)
-
- def version_str(self):
- return super(robot_multirequests, self).version_str() \
- + ' multi: concurrent.futures.' + self.concurrent_class_name
-
- def wait(self):
- log = self.log
- bookmarks = self.bookmarks
- pending = self.pending
-
- free_workers = self.max_urls - len(pending)
- if bookmarks and (free_workers > 0): # we have job and free workers
- for href in bookmarks:
- bookmark, parent, ft = bookmarks[href]
- if ft is not None: # it's already pending
- continue
- parent = bookmark.parent
- del bookmark.parent # Prevent pickling the entire tree
- ft = self.executor.submit(
- worker_check_bookmark, bookmark)
- bookmarks[href] = [bookmark, parent, ft]
- pending.add(ft)
-
- free_workers -= 1
- if free_workers == 0:
- break
-
- if pending:
- done, pending = concurrent.futures.wait(
- pending, self.timeout+1,
- return_when=concurrent.futures.FIRST_COMPLETED)
- self.pending = pending
-
- for ft in done:
- new_bkmk, log_lines = ft.result()
- bookmark, parent, old_ft = bookmarks.pop(new_bkmk.href)
- assert old_ft is ft
- if new_bkmk is not bookmark: # unpickled from a subprocess
- copy_bkmk(new_bkmk, bookmark)
- bookmark.parent = parent
- log('Checked: %s' % bookmark.href)
- for line in log_lines:
- log(line)
+ self.concurrent_class = concurrent.futures.ProcessPoolExecutor
+ concurrent_futures.__init__(self, *args, **kw)
- def stop(self):
- super(robot_multirequests, self).stop()
- self.executor.shutdown(wait=True)
+ def check_bkmk_task(self):
+ return check_bookmark_subproc
-def worker_check_bookmark(bookmark):
+def check_bookmark_subproc(bookmark):
log_lines = []
robot = import_robot('requests')
set_params(robot, robot_params)
--- /dev/null
+"""Mix-in for robots based on concurrent.futures, processes multiple URLs in parallel.
+
+This file is a part of Bookmarks database and Internet robot.
+
+"""
+
+__author__ = "Oleg Broytman <phd@phdru.name>"
+__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
+__license__ = "GNU GPL"
+
+__all__ = ['concurrent_futures']
+
+
+import concurrent.futures
+import os
+
+from bkmk_objects import copy_bkmk
+from Robots.base import robot_base
+from Robots.multi_mixin import multi_mixin
+
+
+cpu_count = os.cpu_count()
+
+
+class concurrent_futures(multi_mixin, robot_base):
+ # We're I/O-bound, not CPU-bound
+ max_urls = 2*cpu_count if cpu_count else 10
+
+ def __init__(self, *args, **kw):
+ self.concurrent_class_name = self.concurrent_class.__name__
+ multi_mixin.__init__(self, *args, **kw)
+ robot_base.__init__(self, *args, **kw)
+ self.executor = self.concurrent_class(max_workers=self.max_urls)
+
+ def version_str(self):
+ return super(concurrent_futures, self).version_str() \
+ + ' multi: concurrent.futures.' + self.concurrent_class_name
+
+ def wait(self):
+ log = self.log
+ bookmarks = self.bookmarks
+ pending = self.pending
+
+ free_workers = self.max_urls - len(pending)
+ if bookmarks and (free_workers > 0): # we have job and free workers
+ for href in bookmarks:
+ bookmark, parent, ft = bookmarks[href]
+ if ft is not None: # it's already pending
+ continue
+ parent = bookmark.parent
+ del bookmark.parent # Prevent pickling the entire tree
+ ft = self.executor.submit(self.check_bkmk_task(), bookmark)
+ bookmarks[href] = [bookmark, parent, ft]
+ pending.add(ft)
+
+ free_workers -= 1
+ if free_workers == 0:
+ break
+
+ if pending:
+ done, pending = concurrent.futures.wait(
+ pending, self.timeout+1,
+ return_when=concurrent.futures.FIRST_COMPLETED)
+ self.pending = pending
+
+ for ft in done:
+ new_bkmk, log_lines = ft.result()
+ bookmark, parent, old_ft = bookmarks.pop(new_bkmk.href)
+ assert old_ft is ft
+ if new_bkmk is not bookmark: # unpickled from a subprocess
+ copy_bkmk(new_bkmk, bookmark)
+ bookmark.parent = parent
+ log('Checked: %s' % bookmark.href)
+ for line in log_lines:
+ log(line)
+
+ def stop(self):
+ super(concurrent_futures, self).stop()
+ self.executor.shutdown(wait=True)