From a52a3058891e23f09b3fb73966619bb4fe789895 Mon Sep 17 00:00:00 2001 From: Oleg Broytman Date: Thu, 6 Mar 2025 17:34:11 +0300 Subject: [PATCH] Refactor(Robots): Split `bkmk_raio.py` into `aio_base.py` --- Robots/aio_base.py | 102 ++++++++++++++++++++++++++++++++++++++++++++ Robots/bkmk_raio.py | 87 +------------------------------------ 2 files changed, 104 insertions(+), 85 deletions(-) create mode 100644 Robots/aio_base.py diff --git a/Robots/aio_base.py b/Robots/aio_base.py new file mode 100644 index 0000000..ce5a26d --- /dev/null +++ b/Robots/aio_base.py @@ -0,0 +1,102 @@ +"""Base class for async robots + +This file is a part of Bookmarks database and Internet robot. + +""" + +__author__ = "Oleg Broytman " +__copyright__ = "Copyright (C) 2025 PhiloSoft Design" +__license__ = "GNU GPL" + +__all__ = ['aio_base'] + + +import asyncio +import contextvars + +from Robots.multi_base import multi_base + + +current_href = contextvars.ContextVar('current_href') + + +class aio_base(multi_base): + def __init__(self, *args, **kw): + multi_base.__init__(self, *args, **kw) + + # We need one event loop for the entire application + # so that we can save pending tasks between calls to self.wait(). + # This also means we cannot use asyncio.run(). + self.loop = loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Rename self.log, create one log_lines list per URL + self.file_log = self.log + del self.log + self.logs = {} # Map {href: [log lines]} + + def __getattr__(self, attr): + if attr != 'log': + raise AttributeError(attr) + href = current_href.get() + return self.logs[href].append + + async def check_bookmark_async_log(self, bookmark): + current_href.set(bookmark.href) + await self.check_bookmark_async(bookmark) + + async def get_url(self, url, req_headers): + if url not in self.logs: + self.logs[url] = [] + current_href.set(url) + return await super(aio_base, self).get_url(url, req_headers) + + def wait(self): + self.loop.run_until_complete(self.wait_async()) + + async def wait_async(self): + bookmarks = self.bookmarks + pending = self.pending + + free_workers = self.max_urls - len(pending) + if bookmarks and (free_workers > 0): # we have job and free workers + for href in bookmarks: + bookmark, _, task = bookmarks[href] + if task is not None: # it's already pending + continue + task = asyncio.create_task( + self.check_bookmark_async_log(bookmark)) + bookmarks[href] = [bookmark, None, task] + if href not in self.logs: + self.logs[href] = [] + pending.add(task) + task.href = href + + free_workers -= 1 + if free_workers == 0: + break + + if pending: + done, pending = await asyncio.wait( + pending, timeout=self.timeout, + return_when=asyncio.FIRST_COMPLETED) + self.pending = pending + + for task in done: + bookmark, _, old_task = bookmarks.pop(task.href) + assert old_task is task + log = self.file_log + log_lines = self.logs.pop(bookmark.href) + log('Checked: %s' % bookmark.href) + if log_lines: + for line in log_lines: + log(line) + else: + if hasattr(bookmark, 'error'): + log(' Error: %s' % bookmark.error) + else: + log(' No logs') + + def stop(self): + super(aio_base, self).stop() + self.loop.close() diff --git a/Robots/bkmk_raio.py b/Robots/bkmk_raio.py index bb45797..1786a29 100644 --- a/Robots/bkmk_raio.py +++ b/Robots/bkmk_raio.py @@ -23,46 +23,13 @@ import aioftp import aiohttp import aiohttp.client_exceptions -from Robots.multi_base import multi_base +from Robots.aio_base import aio_base -current_href = contextvars.ContextVar('current_href') - - -class robot_aio(multi_base): - def __init__(self, *args, **kw): - multi_base.__init__(self, *args, **kw) - - # We need one event loop for the entire application - # so that we can save pending tasks between calls to self.wait(). - # This also means we cannot use asyncio.run(). - self.loop = loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - # Rename self.log, create one log_lines list per URL - self.file_log = self.log - del self.log - self.logs = {} # Map {href: [log lines]} - - def __getattr__(self, attr): - if attr != 'log': - raise AttributeError(attr) - href = current_href.get() - return self.logs[href].append - +class robot_aio(aio_base): def version_str(self): return 'aiohttp/%s; multi' % aiohttp.__version__ - async def check_bookmark_async_log(self, bookmark): - current_href.set(bookmark.href) - await self.check_bookmark_async(bookmark) - - async def get_url(self, url, req_headers): - if url not in self.logs: - self.logs[url] = [] - current_href.set(url) - return await super(robot_aio, self).get_url(url, req_headers) - async def get(self, url, req_headers, use_proxy=False): if url.startswith('ftp://'): error, body = await _get_ftp( @@ -84,56 +51,6 @@ class robot_aio(multi_base): ) return error, status, resp_headers, body - def wait(self): - self.loop.run_until_complete(self.wait_async()) - - async def wait_async(self): - bookmarks = self.bookmarks - pending = self.pending - - free_workers = self.max_urls - len(pending) - if bookmarks and (free_workers > 0): # we have job and free workers - for href in bookmarks: - bookmark, _, task = bookmarks[href] - if task is not None: # it's already pending - continue - task = asyncio.create_task( - self.check_bookmark_async_log(bookmark)) - bookmarks[href] = [bookmark, None, task] - if href not in self.logs: - self.logs[href] = [] - pending.add(task) - task.href = href - - free_workers -= 1 - if free_workers == 0: - break - - if pending: - done, pending = await asyncio.wait( - pending, timeout=self.timeout, - return_when=asyncio.FIRST_COMPLETED) - self.pending = pending - - for task in done: - bookmark, _, old_task = bookmarks.pop(task.href) - assert old_task is task - log = self.file_log - log_lines = self.logs.pop(bookmark.href) - log('Checked: %s' % bookmark.href) - if log_lines: - for line in log_lines: - log(line) - else: - if hasattr(bookmark, 'error'): - log(' Error: %s' % bookmark.error) - else: - log(' No logs') - - def stop(self): - super(robot_aio, self).stop() - self.loop.close() - async def _get_http(url, req_headers={}, proxy=None, timeout=60): connector = None -- 2.39.5