From 3db775b8714dfa027930e8785d421292d936ce0e Mon Sep 17 00:00:00 2001 From: Oleg Broytman Date: Tue, 10 Sep 2024 17:17:36 +0300 Subject: [PATCH] Feat(Robots): Robot based on aiohttp and concurrent.futures Processes multiple URLs in parallel (multithreaded). --- Robots/bkmk_rmultiaio.py | 78 ++++++++++++++++++++++++++++++++++++++++ doc/ANNOUNCE | 8 +++-- doc/ChangeLog | 8 +++-- 3 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 Robots/bkmk_rmultiaio.py diff --git a/Robots/bkmk_rmultiaio.py b/Robots/bkmk_rmultiaio.py new file mode 100644 index 0000000..78cdaa2 --- /dev/null +++ b/Robots/bkmk_rmultiaio.py @@ -0,0 +1,78 @@ +"""Robot based on aiohttp and concurrent.futures, +processes multiple URLs in parallel (multithreaded). + +This file is a part of Bookmarks database and Internet robot. + +""" + +__author__ = "Oleg Broytman " +__copyright__ = "Copyright (C) 2024 PhiloSoft Design" +__license__ = "GNU GPL" + +__all__ = ['robot_multiaio'] + + +import asyncio +import aiohttp + +from Robots.bkmk_raio import _get_http, _get_ftp +from Robots.concurrent_futures import cf_multithread + + +class robot_multiaio(cf_multithread): + def __init__(self, *args, **kw): + self.async_pending = set() # pending async tasks + cf_multithread.__init__(self, *args, **kw) + + def version_str(self): + return 'aiohttp/%s; multithreaded' % aiohttp.__version__ + + def main_thread(self): + asyncio.run(self.main_thread_async()) + + async def main_thread_async(self): + """Main loop""" + + while True: + if self.queue.empty(): + pass + else: + request = self.queue.get_nowait() + if request is None: # Signal to stop + return + url, req_headers, use_proxy, queue = request + + task = asyncio.create_task( + self.get_url_task(url, req_headers, use_proxy, queue)) + self.async_pending.add(task) + + if self.async_pending: + done, async_pending = await asyncio.wait( + self.async_pending, timeout=self.timeout, + return_when=asyncio.FIRST_COMPLETED) + self.async_pending = async_pending + + for task in done: + error, status, resp_headers, body, queue = task.result() + queue.put_nowait((error, status, resp_headers, body)) + + async def get_url_task(self, url, req_headers, use_proxy, queue): + if url.startswith('ftp://'): + error, body = await _get_ftp( + url, timeout=self.timeout, + ) + if error is not None: + error = str(error) + return error, None, None, None, queue + return None, None, None, body, queue + + if use_proxy: + proxy = self.proxy + else: + proxy = None + + error, status, resp_headers, body = await _get_http( + url, req_headers, proxy=proxy, + timeout=self.timeout, + ) + return error, status, resp_headers, body, queue diff --git a/doc/ANNOUNCE b/doc/ANNOUNCE index 7239608..9bced52 100644 --- a/doc/ANNOUNCE +++ b/doc/ANNOUNCE @@ -9,14 +9,18 @@ WHAT'S NEW Version 6.2.0 (2024-??-??) + Robot based on aiohttp and concurrent.futures, + processes multiple URLs in parallel (multithreaded). + Works slowly, the same way as curl. + + Default list of robots is multirequests,aio. + Robot based on twisted and concurrent.futures, processes multiple URLs in parallel (multithreaded). Doesn't properly support proxies; has problems with HTTP proxy and doesn't support SOCKS5 proxy at all. Doesn't query FTP; requires more work. - Default list of robots is still multirequests,aio. - Robots: Removed ftp_timeout. diff --git a/doc/ChangeLog b/doc/ChangeLog index 50750a4..e0e4589 100644 --- a/doc/ChangeLog +++ b/doc/ChangeLog @@ -1,13 +1,17 @@ Version 6.2.0 (2024-??-??) + Robot based on aiohttp and concurrent.futures, + processes multiple URLs in parallel (multithreaded). + Works slowly, the same way as curl. + + Default list of robots is multirequests,aio. + Robot based on twisted and concurrent.futures, processes multiple URLs in parallel (multithreaded). Doesn't properly support proxies; has problems with HTTP proxy and doesn't support SOCKS5 proxy at all. Doesn't query FTP; requires more work. - Default list of robots is still multirequests,aio. - Robots: Removed ftp_timeout. Version 6.1.0 (2024-09-08) -- 2.39.5