--- /dev/null
+"""Base class for async robots
+
+This file is a part of Bookmarks database and Internet robot.
+
+"""
+
+__author__ = "Oleg Broytman <phd@phdru.name>"
+__copyright__ = "Copyright (C) 2025 PhiloSoft Design"
+__license__ = "GNU GPL"
+
+__all__ = ['aio_base']
+
+
+import asyncio
+import contextvars
+
+from Robots.multi_base import multi_base
+
+
+current_href = contextvars.ContextVar('current_href')
+
+
+class aio_base(multi_base):
+ def __init__(self, *args, **kw):
+ multi_base.__init__(self, *args, **kw)
+
+ # We need one event loop for the entire application
+ # so that we can save pending tasks between calls to self.wait().
+ # This also means we cannot use asyncio.run().
+ self.loop = loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+
+ # Rename self.log, create one log_lines list per URL
+ self.file_log = self.log
+ del self.log
+ self.logs = {} # Map {href: [log lines]}
+
+ def __getattr__(self, attr):
+ if attr != 'log':
+ raise AttributeError(attr)
+ href = current_href.get()
+ return self.logs[href].append
+
+ async def check_bookmark_async_log(self, bookmark):
+ current_href.set(bookmark.href)
+ await self.check_bookmark_async(bookmark)
+
+ async def get_url(self, url, req_headers):
+ if url not in self.logs:
+ self.logs[url] = []
+ current_href.set(url)
+ return await super(aio_base, self).get_url(url, req_headers)
+
+ def wait(self):
+ self.loop.run_until_complete(self.wait_async())
+
+ async def wait_async(self):
+ bookmarks = self.bookmarks
+ pending = self.pending
+
+ free_workers = self.max_urls - len(pending)
+ if bookmarks and (free_workers > 0): # we have job and free workers
+ for href in bookmarks:
+ bookmark, _, task = bookmarks[href]
+ if task is not None: # it's already pending
+ continue
+ task = asyncio.create_task(
+ self.check_bookmark_async_log(bookmark))
+ bookmarks[href] = [bookmark, None, task]
+ if href not in self.logs:
+ self.logs[href] = []
+ pending.add(task)
+ task.href = href
+
+ free_workers -= 1
+ if free_workers == 0:
+ break
+
+ if pending:
+ done, pending = await asyncio.wait(
+ pending, timeout=self.timeout,
+ return_when=asyncio.FIRST_COMPLETED)
+ self.pending = pending
+
+ for task in done:
+ bookmark, _, old_task = bookmarks.pop(task.href)
+ assert old_task is task
+ log = self.file_log
+ log_lines = self.logs.pop(bookmark.href)
+ log('Checked: %s' % bookmark.href)
+ if log_lines:
+ for line in log_lines:
+ log(line)
+ else:
+ if hasattr(bookmark, 'error'):
+ log(' Error: %s' % bookmark.error)
+ else:
+ log(' No logs')
+
+ def stop(self):
+ super(aio_base, self).stop()
+ self.loop.close()
import aiohttp
import aiohttp.client_exceptions
-from Robots.multi_base import multi_base
+from Robots.aio_base import aio_base
-current_href = contextvars.ContextVar('current_href')
-
-
-class robot_aio(multi_base):
- def __init__(self, *args, **kw):
- multi_base.__init__(self, *args, **kw)
-
- # We need one event loop for the entire application
- # so that we can save pending tasks between calls to self.wait().
- # This also means we cannot use asyncio.run().
- self.loop = loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
-
- # Rename self.log, create one log_lines list per URL
- self.file_log = self.log
- del self.log
- self.logs = {} # Map {href: [log lines]}
-
- def __getattr__(self, attr):
- if attr != 'log':
- raise AttributeError(attr)
- href = current_href.get()
- return self.logs[href].append
-
+class robot_aio(aio_base):
def version_str(self):
return 'aiohttp/%s; multi' % aiohttp.__version__
- async def check_bookmark_async_log(self, bookmark):
- current_href.set(bookmark.href)
- await self.check_bookmark_async(bookmark)
-
- async def get_url(self, url, req_headers):
- if url not in self.logs:
- self.logs[url] = []
- current_href.set(url)
- return await super(robot_aio, self).get_url(url, req_headers)
-
async def get(self, url, req_headers, use_proxy=False):
if url.startswith('ftp://'):
error, body = await _get_ftp(
)
return error, status, resp_headers, body
- def wait(self):
- self.loop.run_until_complete(self.wait_async())
-
- async def wait_async(self):
- bookmarks = self.bookmarks
- pending = self.pending
-
- free_workers = self.max_urls - len(pending)
- if bookmarks and (free_workers > 0): # we have job and free workers
- for href in bookmarks:
- bookmark, _, task = bookmarks[href]
- if task is not None: # it's already pending
- continue
- task = asyncio.create_task(
- self.check_bookmark_async_log(bookmark))
- bookmarks[href] = [bookmark, None, task]
- if href not in self.logs:
- self.logs[href] = []
- pending.add(task)
- task.href = href
-
- free_workers -= 1
- if free_workers == 0:
- break
-
- if pending:
- done, pending = await asyncio.wait(
- pending, timeout=self.timeout,
- return_when=asyncio.FIRST_COMPLETED)
- self.pending = pending
-
- for task in done:
- bookmark, _, old_task = bookmarks.pop(task.href)
- assert old_task is task
- log = self.file_log
- log_lines = self.logs.pop(bookmark.href)
- log('Checked: %s' % bookmark.href)
- if log_lines:
- for line in log_lines:
- log(line)
- else:
- if hasattr(bookmark, 'error'):
- log(' Error: %s' % bookmark.error)
- else:
- log(' No logs')
-
- def stop(self):
- super(robot_aio, self).stop()
- self.loop.close()
-
async def _get_http(url, req_headers={}, proxy=None, timeout=60):
connector = None