--- /dev/null
+"""Robot based on curl_multi and concurrent.futures,
+processes multiple URLs in parallel (multithreaded).
+
+This file is a part of Bookmarks database and Internet robot.
+
+"""
+
+__author__ = "Oleg Broytman <phd@phdru.name>"
+__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
+__license__ = "GNU GPL"
+
+__all__ = ['robot_curlmulti']
+
+
+from queue import Queue, Empty as QueueEmpty
+from time import sleep
+import concurrent.futures
+import threading
+
+import pycurl
+
+from Robots.base import robot_base
+from Robots.concurrent_futures import concurrent_futures
+from Robots.bkmk_rcurl import CurlWrapper
+
+
+class robot_curlmulti(concurrent_futures):
+ def __init__(self, *args, **kw):
+ self.concurrent_class = concurrent.futures.ThreadPoolExecutor
+ concurrent_futures.__init__(self, *args, **kw)
+ #
+ # Rename self.log, create one log_lines list per URL (thread)
+ self.file_log = self.log
+ del self.log
+ self.local = threading.local()
+
+ # Queue to pass requests to the main loop
+ self.queue = Queue()
+
+ # Main loop: run curl_multi
+ self.executor.submit(self.curl_multi)
+
+ def __getattr__(self, attr):
+ if attr != 'log':
+ raise AttributeError(attr)
+ try:
+ return self.local.log.append
+ except AttributeError:
+ return self.file_log
+
+ def check_bkmk_task(self):
+ return self.check_bkmk_thread
+
+ def check_bkmk_thread(self, bookmark):
+ self.local.log = log = []
+ robot_base.check_bookmark(self, bookmark)
+ return bookmark, log
+
+ async def get(self, url, req_headers, use_proxy=False):
+ # Queue to wait results
+ queue = Queue(maxsize=1)
+ self.queue.put((url, req_headers, use_proxy, queue))
+ return queue.get()
+
+ def curl_multi(self):
+ _curl_multi = pycurl.CurlMulti()
+ _curl_multi.wrappers = {}
+
+ while True:
+ if self.queue.empty():
+ pass
+ else:
+ request = self.queue.get_nowait()
+ if request is None: # Signal to stop
+ return
+ url, req_headers, use_proxy, queue = request
+ cw = CurlWrapper(self, url, req_headers, use_proxy)
+ _curl_multi.wrappers[cw.curl] = (cw, queue)
+ _curl_multi.add_handle(cw.curl)
+
+ _curl_multi.select(1.0)
+ while True:
+ ret, num_handles = _curl_multi.perform()
+ _, completed, failed = _curl_multi.info_read()
+ for c in completed:
+ _curl_multi.remove_handle(c)
+ cw, queue = _curl_multi.wrappers.pop(c)
+ queue.put_nowait(
+ (None, cw.getinfo(pycurl.HTTP_CODE),
+ cw.resp_headers, cw.body)
+ )
+ for c, errcode, errmsg in failed:
+ _curl_multi.remove_handle(c)
+ cw, queue = _curl_multi.wrappers.pop(c)
+ queue.put_nowait(('Error: %d %s' % (errcode, errmsg),
+ None, None, None))
+ if ret != pycurl.E_CALL_MULTI_PERFORM:
+ break
+ if num_handles == 0:
+ sleep(1)
+
+ def wait(self):
+ super(robot_curlmulti, self).wait()
+ if not self.bookmarks and not self.pending:
+ self.queue.put(None) # Signal to stop