From d844634fefb0df62aa2ca9464ef1c2cf8f5cd15a Mon Sep 17 00:00:00 2001 From: Oleg Broytman Date: Thu, 5 Sep 2024 17:32:55 +0300 Subject: [PATCH] Refactor(Robots): Split `bkmk_rmultirequests` into `concurrent_futures` mix-in --- Robots/bkmk_rmultirequests.py | 72 ++++--------------------------- Robots/concurrent_futures.py | 79 +++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 64 deletions(-) create mode 100644 Robots/concurrent_futures.py diff --git a/Robots/bkmk_rmultirequests.py b/Robots/bkmk_rmultirequests.py index fc66f4e..39daf9c 100644 --- a/Robots/bkmk_rmultirequests.py +++ b/Robots/bkmk_rmultirequests.py @@ -1,5 +1,5 @@ """Robot based on requests and concurrent.futures, -processes multiple URLs in parallel. +processes multiple URLs in parallel (multiprocess). This file is a part of Bookmarks database and Internet robot. @@ -13,77 +13,21 @@ __all__ = ['robot_multirequests'] import concurrent.futures -import os -from bkmk_objects import copy_bkmk -from Robots.base import robot_base -from Robots.multi_mixin import multi_mixin +from Robots.concurrent_futures import concurrent_futures from robots import import_robot, set_params, robot_params -cpu_count = os.cpu_count() - - -class robot_multirequests(multi_mixin, robot_base): - # We're I/O-bound, not CPU-bound - max_urls = 2*cpu_count if cpu_count else 10 - +class robot_multirequests(concurrent_futures): def __init__(self, *args, **kw): - concurrent_class = concurrent.futures.ProcessPoolExecutor - multi_mixin.__init__(self, *args, **kw) - robot_base.__init__(self, *args, **kw) - self.concurrent_class_name = concurrent_class.__name__ - self.executor = concurrent_class(max_workers=self.max_urls) - - def version_str(self): - return super(robot_multirequests, self).version_str() \ - + ' multi: concurrent.futures.' + self.concurrent_class_name - - def wait(self): - log = self.log - bookmarks = self.bookmarks - pending = self.pending - - free_workers = self.max_urls - len(pending) - if bookmarks and (free_workers > 0): # we have job and free workers - for href in bookmarks: - bookmark, parent, ft = bookmarks[href] - if ft is not None: # it's already pending - continue - parent = bookmark.parent - del bookmark.parent # Prevent pickling the entire tree - ft = self.executor.submit( - worker_check_bookmark, bookmark) - bookmarks[href] = [bookmark, parent, ft] - pending.add(ft) - - free_workers -= 1 - if free_workers == 0: - break - - if pending: - done, pending = concurrent.futures.wait( - pending, self.timeout+1, - return_when=concurrent.futures.FIRST_COMPLETED) - self.pending = pending - - for ft in done: - new_bkmk, log_lines = ft.result() - bookmark, parent, old_ft = bookmarks.pop(new_bkmk.href) - assert old_ft is ft - if new_bkmk is not bookmark: # unpickled from a subprocess - copy_bkmk(new_bkmk, bookmark) - bookmark.parent = parent - log('Checked: %s' % bookmark.href) - for line in log_lines: - log(line) + self.concurrent_class = concurrent.futures.ProcessPoolExecutor + concurrent_futures.__init__(self, *args, **kw) - def stop(self): - super(robot_multirequests, self).stop() - self.executor.shutdown(wait=True) + def check_bkmk_task(self): + return check_bookmark_subproc -def worker_check_bookmark(bookmark): +def check_bookmark_subproc(bookmark): log_lines = [] robot = import_robot('requests') set_params(robot, robot_params) diff --git a/Robots/concurrent_futures.py b/Robots/concurrent_futures.py new file mode 100644 index 0000000..7e4a165 --- /dev/null +++ b/Robots/concurrent_futures.py @@ -0,0 +1,79 @@ +"""Mix-in for robots based on concurrent.futures, processes multiple URLs in parallel. + +This file is a part of Bookmarks database and Internet robot. + +""" + +__author__ = "Oleg Broytman " +__copyright__ = "Copyright (C) 2024 PhiloSoft Design" +__license__ = "GNU GPL" + +__all__ = ['concurrent_futures'] + + +import concurrent.futures +import os + +from bkmk_objects import copy_bkmk +from Robots.base import robot_base +from Robots.multi_mixin import multi_mixin + + +cpu_count = os.cpu_count() + + +class concurrent_futures(multi_mixin, robot_base): + # We're I/O-bound, not CPU-bound + max_urls = 2*cpu_count if cpu_count else 10 + + def __init__(self, *args, **kw): + self.concurrent_class_name = self.concurrent_class.__name__ + multi_mixin.__init__(self, *args, **kw) + robot_base.__init__(self, *args, **kw) + self.executor = self.concurrent_class(max_workers=self.max_urls) + + def version_str(self): + return super(concurrent_futures, self).version_str() \ + + ' multi: concurrent.futures.' + self.concurrent_class_name + + def wait(self): + log = self.log + bookmarks = self.bookmarks + pending = self.pending + + free_workers = self.max_urls - len(pending) + if bookmarks and (free_workers > 0): # we have job and free workers + for href in bookmarks: + bookmark, parent, ft = bookmarks[href] + if ft is not None: # it's already pending + continue + parent = bookmark.parent + del bookmark.parent # Prevent pickling the entire tree + ft = self.executor.submit(self.check_bkmk_task(), bookmark) + bookmarks[href] = [bookmark, parent, ft] + pending.add(ft) + + free_workers -= 1 + if free_workers == 0: + break + + if pending: + done, pending = concurrent.futures.wait( + pending, self.timeout+1, + return_when=concurrent.futures.FIRST_COMPLETED) + self.pending = pending + + for ft in done: + new_bkmk, log_lines = ft.result() + bookmark, parent, old_ft = bookmarks.pop(new_bkmk.href) + assert old_ft is ft + if new_bkmk is not bookmark: # unpickled from a subprocess + copy_bkmk(new_bkmk, bookmark) + bookmark.parent = parent + log('Checked: %s' % bookmark.href) + for line in log_lines: + log(line) + + def stop(self): + super(concurrent_futures, self).stop() + self.executor.shutdown(wait=True) -- 2.39.5