From 7d9b0b67873e9feebcf55375583e6611b73968a2 Mon Sep 17 00:00:00 2001 From: Oleg Broytman Date: Thu, 8 Aug 2024 16:25:33 +0300 Subject: [PATCH] Feat(Robots): Robot based on requests and concurrent.futures Processes multiple URLs in parallel. --- Robots/bkmk_rmultirequests.py | 106 ++++++++++++++++++++++++++++++++++ doc/ANNOUNCE | 6 +- doc/ChangeLog | 10 +++- robots.py | 2 +- 4 files changed, 119 insertions(+), 5 deletions(-) create mode 100644 Robots/bkmk_rmultirequests.py diff --git a/Robots/bkmk_rmultirequests.py b/Robots/bkmk_rmultirequests.py new file mode 100644 index 0000000..b31c57a --- /dev/null +++ b/Robots/bkmk_rmultirequests.py @@ -0,0 +1,106 @@ +"""Robot based on requests and concurrent.futures, +processes multiple URLs in parallel. + +This file is a part of Bookmarks database and Internet robot. + +""" + +__author__ = "Oleg Broytman " +__copyright__ = "Copyright (C) 2024 PhiloSoft Design" +__license__ = "GNU GPL" + +__all__ = ['robot_multirequests'] + + +import concurrent.futures +import os + +from bkmk_objects import copy_bkmk +from Robots.bkmk_rrequests import robot_requests +from robots import import_robot, set_params, robot_params + +cpu_count = os.cpu_count() + + +class robot_multirequests(robot_requests): + concurrent_class = concurrent.futures.ProcessPoolExecutor # or ThreadPoolExecutor # noqa: E501 line too long + # We're I/O-bound, not CPU-bound + max_workers = 2*cpu_count if cpu_count else 10 + + def __init__(self, *args, **kw): + if isinstance(self.max_workers, str): + self.max_workers = int(self.max_workers) + concurrent_class = getattr(concurrent.futures, self.concurrent_class) \ + if isinstance(self.concurrent_class, str) \ + else self.concurrent_class + self.concurrent_class_name = concurrent_class.__name__ + robot_requests.__init__(self, *args, **kw) + self.executor = concurrent_class(max_workers=self.max_workers) + + # Bookmarks waiting to be processed; + # maps {URL: [bookmark, saved parent, future]} + self.bookmarks = {} + self.pending = set() # pending futures + + def version_str(self): + return super(robot_multirequests, self).version_str() \ + + ' concurrent.futures.' + self.concurrent_class_name + + def check_bookmark(self, bookmark): + href = bookmark.href + bookmarks = self.bookmarks + if href in bookmarks: + return + bookmarks[href] = [bookmark, None, None] + if len(bookmarks) < self.max_workers: + return + self.wait() + + def wait(self): + log = self.log + bookmarks = self.bookmarks + pending = self.pending + + process = self.max_workers - len(pending) + if bookmarks and (process > 0): # there's job and free workers + for href in bookmarks: + bookmark, parent, ft = bookmarks[href] + if ft is not None: # it's already pending + continue + parent = bookmark.parent + del bookmark.parent # Prevent pickling the entire tree + ft = self.executor.submit( + worker_check_bookmark, bookmark) + bookmarks[href] = [bookmark, parent, ft] + pending.add(ft) + + if pending: + done, pending = concurrent.futures.wait( + pending, self.timeout+1, + return_when=concurrent.futures.FIRST_COMPLETED) + + for ft in done: + new_bkmk, log_lines = ft.result() + bookmark, parent, old_ft = bookmarks.pop(new_bkmk.href) + assert old_ft is ft + if new_bkmk is not bookmark: # unpickled from a subprocess + copy_bkmk(new_bkmk, bookmark) + bookmark.parent = parent + log('Checked: %s' % bookmark.href) + for line in log_lines: + log(line) + + self.pending = pending + + def stop(self): + while self.bookmarks or self.pending: + self.wait() + self.executor.shutdown(wait=True) + + +def worker_check_bookmark(bookmark): + log_lines = [] + robot = import_robot('requests') + set_params(robot, robot_params) + robot(log_lines.append).check_bookmark(bookmark) + return bookmark, log_lines diff --git a/doc/ANNOUNCE b/doc/ANNOUNCE index 7161011..9f11578 100644 --- a/doc/ANNOUNCE +++ b/doc/ANNOUNCE @@ -7,7 +7,11 @@ bookmarks.html. WHAT'S NEW -Version 5.6.0 (2024-??-??) +Version 5.6.0 (2024-08-15) + + Robot based on requests and concurrent.futures, + processes multiple URLs in parallel. Multiprocess variant works + very well, multithreading not so good (too many sites report errors). Removed urllib-based robots. diff --git a/doc/ChangeLog b/doc/ChangeLog index 38ecb85..a1748f0 100644 --- a/doc/ChangeLog +++ b/doc/ChangeLog @@ -1,11 +1,15 @@ -Version 5.6.0 (2024-??-??) +Version 5.6.0 (2024-08-15) + + Robot based on requests and concurrent.futures, + processes multiple URLs in parallel. Multiprocess variant works + very well, multithreading not so good (too many sites report errors). + + Default list of robots is now multirequests,curl,requests,aiohttp. Removed urllib-based robots. Dropped support for Python 2. - Default list of robots is now curl,requests,aiohttp. - Version 5.5.1 (2024-08-??) Use aioftp in aiohttp robot. diff --git a/robots.py b/robots.py index 3a9491d..583d1a8 100644 --- a/robots.py +++ b/robots.py @@ -16,7 +16,7 @@ from os import environ from bkmk_objects import parse_params, set_params robot_names, robot_params = parse_params( - environ.get("BKMK_ROBOT", "curl,requests,aiohttp")) + environ.get("BKMK_ROBOT", "multirequests,curl,requests,aiohttp")) def import_robot(robot_name): -- 2.39.5