__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
__license__ = "GNU GPL"
-__all__ = ['robot_aiohttp']
+__all__ = ['robot_aio']
from urllib.parse import urlsplit
import asyncio
+import contextvars
import ssl
from aiohttp_socks import ProxyConnector
import aiohttp.client_exceptions
from Robots.base import robot_base
+from Robots.multi_mixin import multi_mixin
-class robot_aiohttp(robot_base):
+current_href = contextvars.ContextVar('current_href')
+
+
+class robot_aio(multi_mixin, robot_base):
+ def __init__(self, *args, **kw):
+ robot_base.__init__(self, *args, **kw)
+ multi_mixin.__init__(self, *args, **kw)
+
+ # We need one event loop for the entire application
+ # so that we can save pending tasks between calls to self.wait().
+ # This also means we cannot use asyncio.run().
+ self.loop = loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+
+ # Rename self.log, create one log_lines list per URL
+ self.file_log = self.log
+ del self.log
+ self.logs = {} # Map {href: [log lines]}
+
+ def __getattr__(self, attr):
+ if attr != 'log':
+ raise AttributeError(attr)
+ href = current_href.get()
+ return self.logs[href].append
+
def version_str(self):
- return 'aiohttp/%s' % aiohttp.__version__
+ return 'aiohttp/%s; multi' % aiohttp.__version__
+
+ async def check_bookmark_async_log(self, bookmark):
+ current_href.set(bookmark.href)
+ await self.check_bookmark_async(bookmark)
+
+ async def get_url(self, url, req_headers):
+ if url not in self.logs:
+ self.logs[url] = []
+ current_href.set(url)
+ return await super(robot_aio, self).get_url(url, req_headers)
async def get(self, url, req_headers, use_proxy=False):
if url.startswith('ftp://'):
)
return error, status, resp_headers, body
+ def wait(self):
+ self.loop.run_until_complete(self.wait_async())
+
+ async def wait_async(self):
+ bookmarks = self.bookmarks
+ pending = self.pending
+
+ free_workers = self.max_urls - len(pending)
+ if bookmarks and (free_workers > 0): # we have job and free workers
+ for href in bookmarks:
+ bookmark, _, task = bookmarks[href]
+ if task is not None: # it's already pending
+ continue
+ task = asyncio.create_task(
+ self.check_bookmark_async_log(bookmark))
+ bookmarks[href] = [bookmark, None, task]
+ if href not in self.logs:
+ self.logs[href] = []
+ pending.add(task)
+ task.href = href
+
+ free_workers -= 1
+ if free_workers == 0:
+ break
+
+ if pending:
+ done, pending = await asyncio.wait(
+ pending, timeout=self.timeout+1,
+ return_when=asyncio.FIRST_COMPLETED)
+ self.pending = pending
+
+ for task in done:
+ bookmark, _, old_task = bookmarks.pop(task.href)
+ assert old_task is task
+ log = self.file_log
+ log_lines = self.logs.pop(bookmark.href)
+ log('Checked: %s' % bookmark.href)
+ if log_lines:
+ for line in log_lines:
+ log(line)
+ else:
+ if hasattr(bookmark, 'error'):
+ log(' Error: %s' % bookmark.error)
+ else:
+ log(' No logs')
+
def get_ftp_welcome(self):
return '' # We don't store welcome message yet
+ def stop(self):
+ super(robot_aio, self).stop()
+ self.loop.close()
+
async def _get_http(url, req_headers={}, proxy=None, timeout=60):
connector = None
username=pusername, password=ppassword, rdns=rdns,
)
proxy = None
+
timeout = aiohttp.ClientTimeout(connect=timeout, total=timeout)
ssl_context = create_urllib3_context(cert_reqs=0,
ciphers='ALL:@SECLEVEL=1')
+++ /dev/null
-"""Robot based on aiohttp, processes multiple URLs in parallel
-
-This file is a part of Bookmarks database and Internet robot.
-
-"""
-
-__author__ = "Oleg Broytman <phd@phdru.name>"
-__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
-__license__ = "GNU GPL"
-
-__all__ = ['robot_multiaio']
-
-
-from Robots.bkmk_raiohttp import robot_aiohttp
-from Robots.multi_async_mixin import multi_async_mixin
-
-
-class robot_multiaio(multi_async_mixin, robot_aiohttp):
- def __init__(self, *args, **kw):
- multi_async_mixin.__init__(self, *args, **kw)
- robot_aiohttp.__init__(self, *args, **kw)
- self._init()
-
- def version_str(self):
- return super(robot_multiaio, self).version_str() + ' multi: aiohttp'
+++ /dev/null
-"""Mix-in for async robots that process multiple URLs in parallel.
-
-This file is a part of Bookmarks database and Internet robot.
-
-"""
-
-__author__ = "Oleg Broytman <phd@phdru.name>"
-__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
-__license__ = "GNU GPL"
-
-__all__ = ['multi_async_mixin']
-
-
-import asyncio
-import contextvars
-
-from Robots.multi_mixin import multi_mixin
-
-
-current_href = contextvars.ContextVar('current_href')
-
-
-class multi_async_mixin(multi_mixin):
- def _init(self):
- # We need one event loop for the entire application
- # so that we can save pending tasks between calls to self.wait().
- # This also means we cannot use asyncio.run().
- self.loop = loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
-
- # Rename self.log, create one log_lines list per URL
- self.file_log = self.log
- del self.log
- self.logs = {} # Map {href: [log lines]}
-
- def __getattr__(self, attr):
- if attr != 'log':
- raise AttributeError(attr)
- href = current_href.get()
- return self.logs[href].append
-
- async def check_bookmark_async_log(self, bookmark):
- current_href.set(bookmark.href)
- await self.check_bookmark_async(bookmark)
-
- async def get_url(self, url, req_headers):
- if url not in self.logs:
- self.logs[url] = []
- current_href.set(url)
- return await super(multi_async_mixin, self).get_url(url, req_headers)
-
- def wait(self):
- self.loop.run_until_complete(self.wait_async())
-
- async def wait_async(self):
- bookmarks = self.bookmarks
- pending = self.pending
-
- free_workers = self.max_urls - len(pending)
- if bookmarks and (free_workers > 0): # we have job and free workers
- for href in bookmarks:
- bookmark, _, task = bookmarks[href]
- if task is not None: # it's already pending
- continue
- task = asyncio.create_task(
- self.check_bookmark_async_log(bookmark))
- bookmarks[href] = [bookmark, None, task]
- if href not in self.logs:
- self.logs[href] = []
- pending.add(task)
- task.href = href
-
- free_workers -= 1
- if free_workers == 0:
- break
-
- if pending:
- done, pending = await asyncio.wait(
- pending, timeout=self.timeout+1,
- return_when=asyncio.FIRST_COMPLETED)
- self.pending = pending
-
- for task in done:
- bookmark, _, old_task = bookmarks.pop(task.href)
- assert old_task is task
- log = self.file_log
- log_lines = self.logs.pop(bookmark.href)
- log('Checked: %s' % bookmark.href)
- if log_lines:
- for line in log_lines:
- log(line)
- else:
- if hasattr(bookmark, 'error'):
- log(' Error: %s' % bookmark.error)
- else:
- log(' No logs')
-
- def stop(self):
- super(multi_async_mixin, self).stop()
- self.loop.close()