--- /dev/null
+"""Robot based on aiohttp, processes multiple URLs in parallel
+
+This file is a part of Bookmarks database and Internet robot.
+
+"""
+
+__author__ = "Oleg Broytman <phd@phdru.name>"
+__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
+__license__ = "GNU GPL"
+
+__all__ = ['robot_multiaio']
+
+
+import asyncio
+import contextvars
+
+from Robots.bkmk_raiohttp import robot_aiohttp
+from Robots.multi_mixin import multi_mixin
+
+
+current_href = contextvars.ContextVar('current_href')
+
+
+class robot_multiaio(multi_mixin, robot_aiohttp):
+ def __init__(self, *args, **kw):
+ multi_mixin.__init__(self, *args, **kw)
+ robot_aiohttp.__init__(self, *args, **kw)
+
+ # We need one event loop for the entire application
+ # so that we can save pending tasks between calls to self.wait().
+ # This also means we cannot use asyncio.run().
+ self.loop = loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+
+ # Rename self.log, create one log_lines list per URL
+ self.file_log = self.log
+ del self.log
+ self.logs = {} # Map {href: [log lines]}
+
+ def __getattr__(self, attr):
+ if attr != 'log':
+ raise AttributeError(attr)
+ href = current_href.get()
+ return self.logs[href].append
+
+ def version_str(self):
+ return super(robot_multiaio, self).version_str() + ' multi: aiohttp'
+
+ async def check_bookmark_async_log(self, bookmark):
+ current_href.set(bookmark.href)
+ await self.check_bookmark_async(bookmark)
+
+ def wait(self):
+ self.loop.run_until_complete(self.wait_async())
+
+ async def wait_async(self):
+ bookmarks = self.bookmarks
+ pending = self.pending
+
+ free_workers = self.max_urls - len(pending)
+ if bookmarks and (free_workers > 0): # we have job and free workers
+ for href in bookmarks:
+ bookmark, _, task = bookmarks[href]
+ if task is not None: # it's already pending
+ continue
+ task = asyncio.create_task(
+ self.check_bookmark_async_log(bookmark))
+ bookmarks[href] = [bookmark, None, task]
+ self.logs[href] = []
+ pending.add(task)
+ task.href = href
+
+ free_workers -= 1
+ if free_workers == 0:
+ break
+
+ if pending:
+ done, pending = await asyncio.wait(
+ pending, timeout=self.timeout+1,
+ return_when=asyncio.FIRST_COMPLETED)
+ self.pending = pending
+
+ for task in done:
+ bookmark, _, old_task = bookmarks.pop(task.href)
+ assert old_task is task
+ log = self.file_log
+ log_lines = self.logs.pop(bookmark.href)
+ log('Checked: %s' % bookmark.href)
+ if log_lines:
+ for line in log_lines:
+ log(line)
+ else:
+ if hasattr(bookmark, 'error'):
+ log(' Error: %s' % bookmark.error)
+ else:
+ log(' No logs')
+
+ def stop(self):
+ super(robot_multiaio, self).stop()
+ self.loop.close()