From 6ddc8b71efb0c4e3ef6e1835f4fec3f48a3adeda Mon Sep 17 00:00:00 2001 From: Oleg Broytman Date: Mon, 9 Sep 2024 23:25:23 +0300 Subject: [PATCH] Refactor(Robots): Split `robot_curl` into `cf_multithread` --- Robots/bkmk_rcurl.py | 56 +++++------------------------------- Robots/concurrent_futures.py | 48 ++++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 50 deletions(-) diff --git a/Robots/bkmk_rcurl.py b/Robots/bkmk_rcurl.py index 553dc7f..c304454 100644 --- a/Robots/bkmk_rcurl.py +++ b/Robots/bkmk_rcurl.py @@ -12,60 +12,23 @@ __license__ = "GNU GPL" __all__ = ['robot_curl'] -from queue import Queue from time import sleep -import concurrent.futures -import threading import certifi import pycurl -from Robots.base import robot_base, encode_url -from Robots.concurrent_futures import concurrent_futures +from Robots.base import encode_url +from Robots.concurrent_futures import cf_multithread -class robot_curl(concurrent_futures): - def __init__(self, *args, **kw): - self.concurrent_class = concurrent.futures.ThreadPoolExecutor - concurrent_futures.__init__(self, *args, **kw) - # - # Rename self.log, create one log_lines list per URL (thread) - self.file_log = self.log - del self.log - self.local = threading.local() - - # Queue to pass requests to the main loop - self.queue = Queue() - - # Main loop: run curl_multi - self.executor.submit(self.curl_multi) - - def __getattr__(self, attr): - if attr != 'log': - raise AttributeError(attr) - try: - return self.local.log.append - except AttributeError: - return self.file_log - +class robot_curl(cf_multithread): def version_str(self): - return str(pycurl.version) - - def check_bkmk_task(self): - return self.check_bkmk_thread + return super(cf_multithread, self).version_str() \ + + '; ' + str(pycurl.version) - def check_bkmk_thread(self, bookmark): - self.local.log = log = [] - robot_base.check_bookmark(self, bookmark) - return bookmark, log + def main_thread(self): + """Main loop: run curl_multi""" - async def get(self, url, req_headers, use_proxy=False): - # Queue to wait results - queue = Queue(maxsize=1) - self.queue.put((url, req_headers, use_proxy, queue)) - return queue.get() - - def curl_multi(self): _curl_multi = pycurl.CurlMulti() _curl_multi.wrappers = {} @@ -102,11 +65,6 @@ class robot_curl(concurrent_futures): if num_handles == 0: sleep(1) - def wait(self): - super(robot_curl, self).wait() - if not self.bookmarks and not self.pending: - self.queue.put(None) # Signal to stop - def get_ftp_welcome(self): return '' # We don't store welcome message yet diff --git a/Robots/concurrent_futures.py b/Robots/concurrent_futures.py index 11dd034..323ae38 100644 --- a/Robots/concurrent_futures.py +++ b/Robots/concurrent_futures.py @@ -11,8 +11,10 @@ __license__ = "GNU GPL" __all__ = ['concurrent_futures'] +from queue import Queue import concurrent.futures import os +import threading from bkmk_objects import copy_bkmk from Robots.base import robot_base @@ -34,7 +36,7 @@ class concurrent_futures(multi_mixin, robot_base): def version_str(self): return super(concurrent_futures, self).version_str() \ - + ' multi: concurrent.futures.' + self.concurrent_class_name + + '; multi: concurrent.futures.' + self.concurrent_class_name def wait(self): log = self.log @@ -83,3 +85,47 @@ class cf_multiprocess(concurrent_futures): def __init__(self, *args, **kw): self.concurrent_class = concurrent.futures.ProcessPoolExecutor concurrent_futures.__init__(self, *args, **kw) + + +class cf_multithread(concurrent_futures): + def __init__(self, *args, **kw): + self.concurrent_class = concurrent.futures.ThreadPoolExecutor + concurrent_futures.__init__(self, *args, **kw) + + # Rename self.log, create one log_lines list per URL (thread) + self.file_log = self.log + del self.log + self.local = threading.local() + + # Queue to pass requests to the main loop + self.queue = Queue() + + # Main loop + self.executor.submit(self.main_thread) + + def __getattr__(self, attr): + if attr != 'log': + raise AttributeError(attr) + try: + return self.local.log.append + except AttributeError: + return self.file_log + + def check_bkmk_task(self): + return self.check_bkmk_thread + + def check_bkmk_thread(self, bookmark): + self.local.log = log = [] + robot_base.check_bookmark(self, bookmark) + return bookmark, log + + async def get(self, url, req_headers, use_proxy=False): + # Queue to wait results + queue = Queue(maxsize=1) + self.queue.put((url, req_headers, use_proxy, queue)) + return queue.get() + + def wait(self): + super(cf_multithread, self).wait() + if not self.bookmarks and not self.pending: + self.queue.put(None) # Signal to stop -- 2.39.5