]> git.phdru.name Git - bookmarks_db.git/commitdiff
Refactor(Robots): Split `robot_curl` into `cf_multithread`
authorOleg Broytman <phd@phdru.name>
Mon, 9 Sep 2024 20:25:23 +0000 (23:25 +0300)
committerOleg Broytman <phd@phdru.name>
Mon, 9 Sep 2024 20:26:04 +0000 (23:26 +0300)
Robots/bkmk_rcurl.py
Robots/concurrent_futures.py

index 553dc7f0351d79529a3ef847040376d86d127bb1..c3044546a8daaf988d7f76cbec4fa250c3bcb20b 100644 (file)
@@ -12,60 +12,23 @@ __license__ = "GNU GPL"
 __all__ = ['robot_curl']
 
 
-from queue import Queue
 from time import sleep
-import concurrent.futures
-import threading
 
 import certifi
 import pycurl
 
-from Robots.base import robot_base, encode_url
-from Robots.concurrent_futures import concurrent_futures
+from Robots.base import encode_url
+from Robots.concurrent_futures import cf_multithread
 
 
-class robot_curl(concurrent_futures):
-    def __init__(self, *args, **kw):
-        self.concurrent_class = concurrent.futures.ThreadPoolExecutor
-        concurrent_futures.__init__(self, *args, **kw)
-        #
-        # Rename self.log, create one log_lines list per URL (thread)
-        self.file_log = self.log
-        del self.log
-        self.local = threading.local()
-
-        # Queue to pass requests to the main loop
-        self.queue = Queue()
-
-        # Main loop: run curl_multi
-        self.executor.submit(self.curl_multi)
-
-    def __getattr__(self, attr):
-        if attr != 'log':
-            raise AttributeError(attr)
-        try:
-            return self.local.log.append
-        except AttributeError:
-            return self.file_log
-
+class robot_curl(cf_multithread):
     def version_str(self):
-        return str(pycurl.version)
-
-    def check_bkmk_task(self):
-        return self.check_bkmk_thread
+        return super(cf_multithread, self).version_str() \
+            + '; ' + str(pycurl.version)
 
-    def check_bkmk_thread(self, bookmark):
-        self.local.log = log = []
-        robot_base.check_bookmark(self, bookmark)
-        return bookmark, log
+    def main_thread(self):
+        """Main loop: run curl_multi"""
 
-    async def get(self, url, req_headers, use_proxy=False):
-        # Queue to wait results
-        queue = Queue(maxsize=1)
-        self.queue.put((url, req_headers, use_proxy, queue))
-        return queue.get()
-
-    def curl_multi(self):
         _curl_multi = pycurl.CurlMulti()
         _curl_multi.wrappers = {}
 
@@ -102,11 +65,6 @@ class robot_curl(concurrent_futures):
             if num_handles == 0:
                 sleep(1)
 
-    def wait(self):
-        super(robot_curl, self).wait()
-        if not self.bookmarks and not self.pending:
-            self.queue.put(None)  # Signal to stop
-
     def get_ftp_welcome(self):
         return ''  # We don't store welcome message yet
 
index 11dd0347ba1d949e8fda08d0daff13e78ff47f75..323ae388e1b9b954ac90d53dc04183414524e363 100644 (file)
@@ -11,8 +11,10 @@ __license__ = "GNU GPL"
 __all__ = ['concurrent_futures']
 
 
+from queue import Queue
 import concurrent.futures
 import os
+import threading
 
 from bkmk_objects import copy_bkmk
 from Robots.base import robot_base
@@ -34,7 +36,7 @@ class concurrent_futures(multi_mixin, robot_base):
 
     def version_str(self):
         return super(concurrent_futures, self).version_str() \
-            + ' multi: concurrent.futures.' + self.concurrent_class_name
+            + '; multi: concurrent.futures.' + self.concurrent_class_name
 
     def wait(self):
         log = self.log
@@ -83,3 +85,47 @@ class cf_multiprocess(concurrent_futures):
     def __init__(self, *args, **kw):
         self.concurrent_class = concurrent.futures.ProcessPoolExecutor
         concurrent_futures.__init__(self, *args, **kw)
+
+
+class cf_multithread(concurrent_futures):
+    def __init__(self, *args, **kw):
+        self.concurrent_class = concurrent.futures.ThreadPoolExecutor
+        concurrent_futures.__init__(self, *args, **kw)
+
+        # Rename self.log, create one log_lines list per URL (thread)
+        self.file_log = self.log
+        del self.log
+        self.local = threading.local()
+
+        # Queue to pass requests to the main loop
+        self.queue = Queue()
+
+        # Main loop
+        self.executor.submit(self.main_thread)
+
+    def __getattr__(self, attr):
+        if attr != 'log':
+            raise AttributeError(attr)
+        try:
+            return self.local.log.append
+        except AttributeError:
+            return self.file_log
+
+    def check_bkmk_task(self):
+        return self.check_bkmk_thread
+
+    def check_bkmk_thread(self, bookmark):
+        self.local.log = log = []
+        robot_base.check_bookmark(self, bookmark)
+        return bookmark, log
+
+    async def get(self, url, req_headers, use_proxy=False):
+        # Queue to wait results
+        queue = Queue(maxsize=1)
+        self.queue.put((url, req_headers, use_proxy, queue))
+        return queue.get()
+
+    def wait(self):
+        super(cf_multithread, self).wait()
+        if not self.bookmarks and not self.pending:
+            self.queue.put(None)  # Signal to stop