]> git.phdru.name Git - bookmarks_db.git/commitdiff
Feat(aio): Combine `aiohttp` with `multiaio`
authorOleg Broytman <phd@phdru.name>
Sat, 7 Sep 2024 11:39:21 +0000 (14:39 +0300)
committerOleg Broytman <phd@phdru.name>
Sat, 7 Sep 2024 15:59:16 +0000 (18:59 +0300)
The combined robot is named just `aio`.

Robots/bkmk_raio.py [moved from Robots/bkmk_raiohttp.py with 52% similarity]
Robots/bkmk_rmultiaio.py [deleted file]
Robots/multi_async_mixin.py [deleted file]
doc/ANNOUNCE
doc/ChangeLog

similarity index 52%
rename from Robots/bkmk_raiohttp.py
rename to Robots/bkmk_raio.py
index 2b82fee52095be4d38b360f2154d9e07e42483fb..65c22344f7cfa0780c4b193deecd965f059c5a61 100644 (file)
@@ -8,11 +8,12 @@ __author__ = "Oleg Broytman <phd@phdru.name>"
 __copyright__ = "Copyright (C) 2024 PhiloSoft Design"
 __license__ = "GNU GPL"
 
-__all__ = ['robot_aiohttp']
+__all__ = ['robot_aio']
 
 
 from urllib.parse import urlsplit
 import asyncio
+import contextvars
 import ssl
 
 from aiohttp_socks import ProxyConnector
@@ -23,11 +24,46 @@ import aiohttp
 import aiohttp.client_exceptions
 
 from Robots.base import robot_base
+from Robots.multi_mixin import multi_mixin
 
 
-class robot_aiohttp(robot_base):
+current_href = contextvars.ContextVar('current_href')
+
+
+class robot_aio(multi_mixin, robot_base):
+    def __init__(self, *args, **kw):
+        robot_base.__init__(self, *args, **kw)
+        multi_mixin.__init__(self, *args, **kw)
+
+        # We need one event loop for the entire application
+        # so that we can save pending tasks between calls to self.wait().
+        # This also means we cannot use asyncio.run().
+        self.loop = loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(loop)
+
+        # Rename self.log, create one log_lines list per URL
+        self.file_log = self.log
+        del self.log
+        self.logs = {}  # Map {href: [log lines]}
+
+    def __getattr__(self, attr):
+        if attr != 'log':
+            raise AttributeError(attr)
+        href = current_href.get()
+        return self.logs[href].append
+
     def version_str(self):
-        return 'aiohttp/%s' % aiohttp.__version__
+        return 'aiohttp/%s; multi' % aiohttp.__version__
+
+    async def check_bookmark_async_log(self, bookmark):
+        current_href.set(bookmark.href)
+        await self.check_bookmark_async(bookmark)
+
+    async def get_url(self, url, req_headers):
+        if url not in self.logs:
+            self.logs[url] = []
+        current_href.set(url)
+        return await super(robot_aio, self).get_url(url, req_headers)
 
     async def get(self, url, req_headers, use_proxy=False):
         if url.startswith('ftp://'):
@@ -50,9 +86,59 @@ class robot_aiohttp(robot_base):
         )
         return error, status, resp_headers, body
 
+    def wait(self):
+        self.loop.run_until_complete(self.wait_async())
+
+    async def wait_async(self):
+        bookmarks = self.bookmarks
+        pending = self.pending
+
+        free_workers = self.max_urls - len(pending)
+        if bookmarks and (free_workers > 0):  # we have job and free workers
+            for href in bookmarks:
+                bookmark, _, task = bookmarks[href]
+                if task is not None:  # it's already pending
+                    continue
+                task = asyncio.create_task(
+                    self.check_bookmark_async_log(bookmark))
+                bookmarks[href] = [bookmark, None, task]
+                if href not in self.logs:
+                    self.logs[href] = []
+                pending.add(task)
+                task.href = href
+
+                free_workers -= 1
+                if free_workers == 0:
+                    break
+
+        if pending:
+            done, pending = await asyncio.wait(
+                pending, timeout=self.timeout+1,
+                return_when=asyncio.FIRST_COMPLETED)
+        self.pending = pending
+
+        for task in done:
+            bookmark, _, old_task = bookmarks.pop(task.href)
+            assert old_task is task
+            log = self.file_log
+            log_lines = self.logs.pop(bookmark.href)
+            log('Checked: %s' % bookmark.href)
+            if log_lines:
+                for line in log_lines:
+                    log(line)
+            else:
+                if hasattr(bookmark, 'error'):
+                    log('   Error: %s' % bookmark.error)
+                else:
+                    log('   No logs')
+
     def get_ftp_welcome(self):
         return ''  # We don't store welcome message yet
 
+    def stop(self):
+        super(robot_aio, self).stop()
+        self.loop.close()
+
 
 async def _get_http(url, req_headers={}, proxy=None, timeout=60):
     connector = None
@@ -69,6 +155,7 @@ async def _get_http(url, req_headers={}, proxy=None, timeout=60):
             username=pusername, password=ppassword, rdns=rdns,
         )
         proxy = None
+
     timeout = aiohttp.ClientTimeout(connect=timeout, total=timeout)
     ssl_context = create_urllib3_context(cert_reqs=0,
                                          ciphers='ALL:@SECLEVEL=1')
diff --git a/Robots/bkmk_rmultiaio.py b/Robots/bkmk_rmultiaio.py
deleted file mode 100644 (file)
index 19f3062..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-"""Robot based on aiohttp, processes multiple URLs in parallel
-
-This file is a part of Bookmarks database and Internet robot.
-
-"""
-
-__author__ = "Oleg Broytman <phd@phdru.name>"
-__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
-__license__ = "GNU GPL"
-
-__all__ = ['robot_multiaio']
-
-
-from Robots.bkmk_raiohttp import robot_aiohttp
-from Robots.multi_async_mixin import multi_async_mixin
-
-
-class robot_multiaio(multi_async_mixin, robot_aiohttp):
-    def __init__(self, *args, **kw):
-        multi_async_mixin.__init__(self, *args, **kw)
-        robot_aiohttp.__init__(self, *args, **kw)
-        self._init()
-
-    def version_str(self):
-        return super(robot_multiaio, self).version_str() + ' multi: aiohttp'
diff --git a/Robots/multi_async_mixin.py b/Robots/multi_async_mixin.py
deleted file mode 100644 (file)
index 7056979..0000000
+++ /dev/null
@@ -1,100 +0,0 @@
-"""Mix-in for async robots that process multiple URLs in parallel.
-
-This file is a part of Bookmarks database and Internet robot.
-
-"""
-
-__author__ = "Oleg Broytman <phd@phdru.name>"
-__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
-__license__ = "GNU GPL"
-
-__all__ = ['multi_async_mixin']
-
-
-import asyncio
-import contextvars
-
-from Robots.multi_mixin import multi_mixin
-
-
-current_href = contextvars.ContextVar('current_href')
-
-
-class multi_async_mixin(multi_mixin):
-    def _init(self):
-        # We need one event loop for the entire application
-        # so that we can save pending tasks between calls to self.wait().
-        # This also means we cannot use asyncio.run().
-        self.loop = loop = asyncio.new_event_loop()
-        asyncio.set_event_loop(loop)
-
-        # Rename self.log, create one log_lines list per URL
-        self.file_log = self.log
-        del self.log
-        self.logs = {}  # Map {href: [log lines]}
-
-    def __getattr__(self, attr):
-        if attr != 'log':
-            raise AttributeError(attr)
-        href = current_href.get()
-        return self.logs[href].append
-
-    async def check_bookmark_async_log(self, bookmark):
-        current_href.set(bookmark.href)
-        await self.check_bookmark_async(bookmark)
-
-    async def get_url(self, url, req_headers):
-        if url not in self.logs:
-            self.logs[url] = []
-        current_href.set(url)
-        return await super(multi_async_mixin, self).get_url(url, req_headers)
-
-    def wait(self):
-        self.loop.run_until_complete(self.wait_async())
-
-    async def wait_async(self):
-        bookmarks = self.bookmarks
-        pending = self.pending
-
-        free_workers = self.max_urls - len(pending)
-        if bookmarks and (free_workers > 0):  # we have job and free workers
-            for href in bookmarks:
-                bookmark, _, task = bookmarks[href]
-                if task is not None:  # it's already pending
-                    continue
-                task = asyncio.create_task(
-                    self.check_bookmark_async_log(bookmark))
-                bookmarks[href] = [bookmark, None, task]
-                if href not in self.logs:
-                    self.logs[href] = []
-                pending.add(task)
-                task.href = href
-
-                free_workers -= 1
-                if free_workers == 0:
-                    break
-
-        if pending:
-            done, pending = await asyncio.wait(
-                pending, timeout=self.timeout+1,
-                return_when=asyncio.FIRST_COMPLETED)
-        self.pending = pending
-
-        for task in done:
-            bookmark, _, old_task = bookmarks.pop(task.href)
-            assert old_task is task
-            log = self.file_log
-            log_lines = self.logs.pop(bookmark.href)
-            log('Checked: %s' % bookmark.href)
-            if log_lines:
-                for line in log_lines:
-                    log(line)
-            else:
-                if hasattr(bookmark, 'error'):
-                    log('   Error: %s' % bookmark.error)
-                else:
-                    log('   No logs')
-
-    def stop(self):
-        super(multi_async_mixin, self).stop()
-        self.loop.close()
index 8d6e40e023d2b883851a7f35bae4aa455d6e0778..14fda05c99174dbef2610053a10d50ee9740d0f5 100644 (file)
@@ -9,6 +9,8 @@ WHAT'S NEW
 
 Version 6.1.0 (2024-??-??)
 
+   Combine aiohttp with multiaio; the combined robot is named just aio.
+
    Make bkmk_rmultirequests always multiprocess.
 
 Version 6.0.0 (2024-08-19)
index bb90fc8cdc84edf74cfc3d7b6ce445b063138cb4..ad7fe9ce79cbf9cdd46b5097dcae9168c4f32ce6 100644 (file)
@@ -1,5 +1,7 @@
 Version 6.1.0 (2024-??-??)
 
+   Combine aiohttp with multiaio; the combined robot is named just aio.
+
    Make bkmk_rmultirequests always multiprocess.
 
 Version 6.0.0 (2024-08-19)