From ceea1404b92a501543027954e8d2f212dd2e0dfc Mon Sep 17 00:00:00 2001 From: Oleg Broytman Date: Sat, 7 Sep 2024 14:39:21 +0300 Subject: [PATCH] Feat(aio): Combine `aiohttp` with `multiaio` The combined robot is named just `aio`. --- Robots/{bkmk_raiohttp.py => bkmk_raio.py} | 93 +++++++++++++++++++- Robots/bkmk_rmultiaio.py | 25 ------ Robots/multi_async_mixin.py | 100 ---------------------- doc/ANNOUNCE | 2 + doc/ChangeLog | 2 + 5 files changed, 94 insertions(+), 128 deletions(-) rename Robots/{bkmk_raiohttp.py => bkmk_raio.py} (52%) delete mode 100644 Robots/bkmk_rmultiaio.py delete mode 100644 Robots/multi_async_mixin.py diff --git a/Robots/bkmk_raiohttp.py b/Robots/bkmk_raio.py similarity index 52% rename from Robots/bkmk_raiohttp.py rename to Robots/bkmk_raio.py index 2b82fee..65c2234 100644 --- a/Robots/bkmk_raiohttp.py +++ b/Robots/bkmk_raio.py @@ -8,11 +8,12 @@ __author__ = "Oleg Broytman " __copyright__ = "Copyright (C) 2024 PhiloSoft Design" __license__ = "GNU GPL" -__all__ = ['robot_aiohttp'] +__all__ = ['robot_aio'] from urllib.parse import urlsplit import asyncio +import contextvars import ssl from aiohttp_socks import ProxyConnector @@ -23,11 +24,46 @@ import aiohttp import aiohttp.client_exceptions from Robots.base import robot_base +from Robots.multi_mixin import multi_mixin -class robot_aiohttp(robot_base): +current_href = contextvars.ContextVar('current_href') + + +class robot_aio(multi_mixin, robot_base): + def __init__(self, *args, **kw): + robot_base.__init__(self, *args, **kw) + multi_mixin.__init__(self, *args, **kw) + + # We need one event loop for the entire application + # so that we can save pending tasks between calls to self.wait(). + # This also means we cannot use asyncio.run(). + self.loop = loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Rename self.log, create one log_lines list per URL + self.file_log = self.log + del self.log + self.logs = {} # Map {href: [log lines]} + + def __getattr__(self, attr): + if attr != 'log': + raise AttributeError(attr) + href = current_href.get() + return self.logs[href].append + def version_str(self): - return 'aiohttp/%s' % aiohttp.__version__ + return 'aiohttp/%s; multi' % aiohttp.__version__ + + async def check_bookmark_async_log(self, bookmark): + current_href.set(bookmark.href) + await self.check_bookmark_async(bookmark) + + async def get_url(self, url, req_headers): + if url not in self.logs: + self.logs[url] = [] + current_href.set(url) + return await super(robot_aio, self).get_url(url, req_headers) async def get(self, url, req_headers, use_proxy=False): if url.startswith('ftp://'): @@ -50,9 +86,59 @@ class robot_aiohttp(robot_base): ) return error, status, resp_headers, body + def wait(self): + self.loop.run_until_complete(self.wait_async()) + + async def wait_async(self): + bookmarks = self.bookmarks + pending = self.pending + + free_workers = self.max_urls - len(pending) + if bookmarks and (free_workers > 0): # we have job and free workers + for href in bookmarks: + bookmark, _, task = bookmarks[href] + if task is not None: # it's already pending + continue + task = asyncio.create_task( + self.check_bookmark_async_log(bookmark)) + bookmarks[href] = [bookmark, None, task] + if href not in self.logs: + self.logs[href] = [] + pending.add(task) + task.href = href + + free_workers -= 1 + if free_workers == 0: + break + + if pending: + done, pending = await asyncio.wait( + pending, timeout=self.timeout+1, + return_when=asyncio.FIRST_COMPLETED) + self.pending = pending + + for task in done: + bookmark, _, old_task = bookmarks.pop(task.href) + assert old_task is task + log = self.file_log + log_lines = self.logs.pop(bookmark.href) + log('Checked: %s' % bookmark.href) + if log_lines: + for line in log_lines: + log(line) + else: + if hasattr(bookmark, 'error'): + log(' Error: %s' % bookmark.error) + else: + log(' No logs') + def get_ftp_welcome(self): return '' # We don't store welcome message yet + def stop(self): + super(robot_aio, self).stop() + self.loop.close() + async def _get_http(url, req_headers={}, proxy=None, timeout=60): connector = None @@ -69,6 +155,7 @@ async def _get_http(url, req_headers={}, proxy=None, timeout=60): username=pusername, password=ppassword, rdns=rdns, ) proxy = None + timeout = aiohttp.ClientTimeout(connect=timeout, total=timeout) ssl_context = create_urllib3_context(cert_reqs=0, ciphers='ALL:@SECLEVEL=1') diff --git a/Robots/bkmk_rmultiaio.py b/Robots/bkmk_rmultiaio.py deleted file mode 100644 index 19f3062..0000000 --- a/Robots/bkmk_rmultiaio.py +++ /dev/null @@ -1,25 +0,0 @@ -"""Robot based on aiohttp, processes multiple URLs in parallel - -This file is a part of Bookmarks database and Internet robot. - -""" - -__author__ = "Oleg Broytman " -__copyright__ = "Copyright (C) 2024 PhiloSoft Design" -__license__ = "GNU GPL" - -__all__ = ['robot_multiaio'] - - -from Robots.bkmk_raiohttp import robot_aiohttp -from Robots.multi_async_mixin import multi_async_mixin - - -class robot_multiaio(multi_async_mixin, robot_aiohttp): - def __init__(self, *args, **kw): - multi_async_mixin.__init__(self, *args, **kw) - robot_aiohttp.__init__(self, *args, **kw) - self._init() - - def version_str(self): - return super(robot_multiaio, self).version_str() + ' multi: aiohttp' diff --git a/Robots/multi_async_mixin.py b/Robots/multi_async_mixin.py deleted file mode 100644 index 7056979..0000000 --- a/Robots/multi_async_mixin.py +++ /dev/null @@ -1,100 +0,0 @@ -"""Mix-in for async robots that process multiple URLs in parallel. - -This file is a part of Bookmarks database and Internet robot. - -""" - -__author__ = "Oleg Broytman " -__copyright__ = "Copyright (C) 2024 PhiloSoft Design" -__license__ = "GNU GPL" - -__all__ = ['multi_async_mixin'] - - -import asyncio -import contextvars - -from Robots.multi_mixin import multi_mixin - - -current_href = contextvars.ContextVar('current_href') - - -class multi_async_mixin(multi_mixin): - def _init(self): - # We need one event loop for the entire application - # so that we can save pending tasks between calls to self.wait(). - # This also means we cannot use asyncio.run(). - self.loop = loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - # Rename self.log, create one log_lines list per URL - self.file_log = self.log - del self.log - self.logs = {} # Map {href: [log lines]} - - def __getattr__(self, attr): - if attr != 'log': - raise AttributeError(attr) - href = current_href.get() - return self.logs[href].append - - async def check_bookmark_async_log(self, bookmark): - current_href.set(bookmark.href) - await self.check_bookmark_async(bookmark) - - async def get_url(self, url, req_headers): - if url not in self.logs: - self.logs[url] = [] - current_href.set(url) - return await super(multi_async_mixin, self).get_url(url, req_headers) - - def wait(self): - self.loop.run_until_complete(self.wait_async()) - - async def wait_async(self): - bookmarks = self.bookmarks - pending = self.pending - - free_workers = self.max_urls - len(pending) - if bookmarks and (free_workers > 0): # we have job and free workers - for href in bookmarks: - bookmark, _, task = bookmarks[href] - if task is not None: # it's already pending - continue - task = asyncio.create_task( - self.check_bookmark_async_log(bookmark)) - bookmarks[href] = [bookmark, None, task] - if href not in self.logs: - self.logs[href] = [] - pending.add(task) - task.href = href - - free_workers -= 1 - if free_workers == 0: - break - - if pending: - done, pending = await asyncio.wait( - pending, timeout=self.timeout+1, - return_when=asyncio.FIRST_COMPLETED) - self.pending = pending - - for task in done: - bookmark, _, old_task = bookmarks.pop(task.href) - assert old_task is task - log = self.file_log - log_lines = self.logs.pop(bookmark.href) - log('Checked: %s' % bookmark.href) - if log_lines: - for line in log_lines: - log(line) - else: - if hasattr(bookmark, 'error'): - log(' Error: %s' % bookmark.error) - else: - log(' No logs') - - def stop(self): - super(multi_async_mixin, self).stop() - self.loop.close() diff --git a/doc/ANNOUNCE b/doc/ANNOUNCE index 8d6e40e..14fda05 100644 --- a/doc/ANNOUNCE +++ b/doc/ANNOUNCE @@ -9,6 +9,8 @@ WHAT'S NEW Version 6.1.0 (2024-??-??) + Combine aiohttp with multiaio; the combined robot is named just aio. + Make bkmk_rmultirequests always multiprocess. Version 6.0.0 (2024-08-19) diff --git a/doc/ChangeLog b/doc/ChangeLog index bb90fc8..ad7fe9c 100644 --- a/doc/ChangeLog +++ b/doc/ChangeLog @@ -1,5 +1,7 @@ Version 6.1.0 (2024-??-??) + Combine aiohttp with multiaio; the combined robot is named just aio. + Make bkmk_rmultirequests always multiprocess. Version 6.0.0 (2024-08-19) -- 2.39.5