__all__ = ['robot_multiaio']
-import asyncio
-import contextvars
-
from Robots.bkmk_raiohttp import robot_aiohttp
-from Robots.multi_mixin import multi_mixin
-
+from Robots.multi_async_mixin import multi_async_mixin
-current_href = contextvars.ContextVar('current_href')
-
-class robot_multiaio(multi_mixin, robot_aiohttp):
+class robot_multiaio(multi_async_mixin, robot_aiohttp):
def __init__(self, *args, **kw):
- multi_mixin.__init__(self, *args, **kw)
+ multi_async_mixin.__init__(self, *args, **kw)
robot_aiohttp.__init__(self, *args, **kw)
-
- # We need one event loop for the entire application
- # so that we can save pending tasks between calls to self.wait().
- # This also means we cannot use asyncio.run().
- self.loop = loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
-
- # Rename self.log, create one log_lines list per URL
- self.file_log = self.log
- del self.log
- self.logs = {} # Map {href: [log lines]}
-
- def __getattr__(self, attr):
- if attr != 'log':
- raise AttributeError(attr)
- href = current_href.get()
- return self.logs[href].append
+ self._init()
def version_str(self):
return super(robot_multiaio, self).version_str() + ' multi: aiohttp'
-
- async def check_bookmark_async_log(self, bookmark):
- current_href.set(bookmark.href)
- await self.check_bookmark_async(bookmark)
-
- async def get_url(self, url, headers):
- if url not in self.logs:
- self.logs[url] = []
- current_href.set(url)
- return await super(robot_multiaio, self).get_url(url, headers)
-
- def wait(self):
- self.loop.run_until_complete(self.wait_async())
-
- async def wait_async(self):
- bookmarks = self.bookmarks
- pending = self.pending
-
- free_workers = self.max_urls - len(pending)
- if bookmarks and (free_workers > 0): # we have job and free workers
- for href in bookmarks:
- bookmark, _, task = bookmarks[href]
- if task is not None: # it's already pending
- continue
- task = asyncio.create_task(
- self.check_bookmark_async_log(bookmark))
- bookmarks[href] = [bookmark, None, task]
- if href not in self.logs:
- self.logs[href] = []
- pending.add(task)
- task.href = href
-
- free_workers -= 1
- if free_workers == 0:
- break
-
- if pending:
- done, pending = await asyncio.wait(
- pending, timeout=self.timeout+1,
- return_when=asyncio.FIRST_COMPLETED)
- self.pending = pending
-
- for task in done:
- bookmark, _, old_task = bookmarks.pop(task.href)
- assert old_task is task
- log = self.file_log
- log_lines = self.logs.pop(bookmark.href)
- log('Checked: %s' % bookmark.href)
- if log_lines:
- for line in log_lines:
- log(line)
- else:
- if hasattr(bookmark, 'error'):
- log(' Error: %s' % bookmark.error)
- else:
- log(' No logs')
-
- def stop(self):
- super(robot_multiaio, self).stop()
- self.loop.close()
--- /dev/null
+"""Mix-in for async robots that process multiple URLs in parallel.
+
+This file is a part of Bookmarks database and Internet robot.
+
+"""
+
+__author__ = "Oleg Broytman <phd@phdru.name>"
+__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
+__license__ = "GNU GPL"
+
+__all__ = ['multi_async_mixin']
+
+
+import asyncio
+import contextvars
+
+from Robots.multi_mixin import multi_mixin
+
+
+current_href = contextvars.ContextVar('current_href')
+
+
+class multi_async_mixin(multi_mixin):
+ def _init(self):
+ # We need one event loop for the entire application
+ # so that we can save pending tasks between calls to self.wait().
+ # This also means we cannot use asyncio.run().
+ self.loop = loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+
+ # Rename self.log, create one log_lines list per URL
+ self.file_log = self.log
+ del self.log
+ self.logs = {} # Map {href: [log lines]}
+
+ def __getattr__(self, attr):
+ if attr != 'log':
+ raise AttributeError(attr)
+ href = current_href.get()
+ return self.logs[href].append
+
+ async def check_bookmark_async_log(self, bookmark):
+ current_href.set(bookmark.href)
+ await self.check_bookmark_async(bookmark)
+
+ async def get_url(self, url, headers):
+ if url not in self.logs:
+ self.logs[url] = []
+ current_href.set(url)
+ return await super(multi_async_mixin, self).get_url(url, headers)
+
+ def wait(self):
+ self.loop.run_until_complete(self.wait_async())
+
+ async def wait_async(self):
+ bookmarks = self.bookmarks
+ pending = self.pending
+
+ free_workers = self.max_urls - len(pending)
+ if bookmarks and (free_workers > 0): # we have job and free workers
+ for href in bookmarks:
+ bookmark, _, task = bookmarks[href]
+ if task is not None: # it's already pending
+ continue
+ task = asyncio.create_task(
+ self.check_bookmark_async_log(bookmark))
+ bookmarks[href] = [bookmark, None, task]
+ if href not in self.logs:
+ self.logs[href] = []
+ pending.add(task)
+ task.href = href
+
+ free_workers -= 1
+ if free_workers == 0:
+ break
+
+ if pending:
+ done, pending = await asyncio.wait(
+ pending, timeout=self.timeout+1,
+ return_when=asyncio.FIRST_COMPLETED)
+ self.pending = pending
+
+ for task in done:
+ bookmark, _, old_task = bookmarks.pop(task.href)
+ assert old_task is task
+ log = self.file_log
+ log_lines = self.logs.pop(bookmark.href)
+ log('Checked: %s' % bookmark.href)
+ if log_lines:
+ for line in log_lines:
+ log(line)
+ else:
+ if hasattr(bookmark, 'error'):
+ log(' Error: %s' % bookmark.error)
+ else:
+ log(' No logs')
+
+ def stop(self):
+ super(multi_async_mixin, self).stop()
+ self.loop.close()