curl, multiaio, twisted.
+++ /dev/null
-"""Robot based on curl_multi and concurrent.futures,
-processes multiple URLs in parallel (multithreaded).
-This file is a part of Bookmarks database and Internet robot.
-__author__ = "Oleg Broytman <>"
-__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
-__license__ = "GNU GPL"
-__all__ = ['robot_curl']
-from time import sleep
-import certifi
-import pycurl
-from Robots.base import encode_url
-from Robots.concurrent_futures import cf_multithread
-class robot_curl(cf_multithread):
- def version_str(self):
- return super(cf_multithread, self).version_str() \
- + '; ' + str(pycurl.version)
- def main_thread(self):
- """Main loop: run curl_multi"""
- _curl_multi = pycurl.CurlMulti()
- _curl_multi.wrappers = {}
- while True:
- if self.queue.empty():
- pass
- else:
- request = self.queue.get_nowait()
- if request is None: # Signal to stop
- return
- url, req_headers, use_proxy, queue = request
- cw = CurlWrapper(self, url, req_headers, use_proxy)
- _curl_multi.wrappers[cw.curl] = (cw, queue)
- _curl_multi.add_handle(cw.curl)
- while True:
- ret, num_handles = _curl_multi.perform()
- _, completed, failed = _curl_multi.info_read()
- for c in completed:
- _curl_multi.remove_handle(c)
- cw, queue = _curl_multi.wrappers.pop(c)
- queue.put_nowait(
- (None, cw.getinfo(pycurl.HTTP_CODE),
- cw.resp_headers, cw.body)
- )
- for c, errcode, errmsg in failed:
- _curl_multi.remove_handle(c)
- cw, queue = _curl_multi.wrappers.pop(c)
- queue.put_nowait(('Error: %d %s' % (errcode, errmsg),
- None, None, None))
- if ret != pycurl.E_CALL_MULTI_PERFORM:
- break
- if num_handles == 0:
- sleep(1)
-class CurlWrapper:
- def __init__(self, robot, url, req_headers, use_proxy):
- req_headers = ['%s: %s' % (k, v) for k, v in req_headers.items()]
- self.curl = curl = pycurl.Curl()
- self.resp_headers = {}
- self.body = b''
- # Do not follow redirects
- curl.setopt(pycurl.FOLLOWLOCATION, 0)
- # Lower security settings - we need to get as musch as possible
- curl.setopt(pycurl.SSL_CIPHER_LIST, 'ALL:@SECLEVEL=1')
- curl.setopt(pycurl.SSL_VERIFYHOST, 0)
- curl.setopt(pycurl.SSL_VERIFYPEER, 0)
- curl.setopt(curl.CAINFO, certifi.where())
- # Set timeouts to avoid hanging too long
- curl.setopt(pycurl.CONNECTTIMEOUT, robot.timeout)
- curl.setopt(pycurl.TIMEOUT, robot.timeout)
- # Parse Last-Modified
- curl.setopt(pycurl.OPT_FILETIME, 1)
- if use_proxy:
- curl.setopt(pycurl.PROXY, robot.proxy)
- # Set up a callback to capture the headers and the body
- curl.setopt(pycurl.HEADERFUNCTION, self.header_callback)
- curl.setopt(pycurl.WRITEFUNCTION, self.body_callback)
- curl.setopt(pycurl.HTTPGET, 1)
- curl.setopt(pycurl.HTTPHEADER, req_headers)
- try:
- url.encode('ascii')
- except UnicodeEncodeError:
- url = encode_url(url)
- curl.setopt(pycurl.URL, url)
- def __getattr__(self, attr):
- return getattr(self.curl, attr)
- def header_callback(self, data):
- for encoding in 'ascii', 'latin1', 'utf-8':
- try:
- data = data.decode(encoding)
- except UnicodeDecodeError:
- pass
- else:
- break
- else:
- print("Error decoding header:", data)
- return
- if ':' in data:
- key, value = data.split(':', 1)
- self.resp_headers[key.title()] = value.strip()
- def body_callback(self, data):
- self.body += data
+++ /dev/null
-"""Robot based on aiohttp and concurrent.futures,
-processes multiple URLs in parallel (multithreaded).
-This file is a part of Bookmarks database and Internet robot.
-__author__ = "Oleg Broytman <>"
-__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
-__license__ = "GNU GPL"
-__all__ = ['robot_multiaio']
-import asyncio
-import aiohttp
-from Robots.bkmk_raio import _get_http, _get_ftp
-from Robots.concurrent_futures import cf_multithread
-class robot_multiaio(cf_multithread):
- def __init__(self, *args, **kw):
- self.async_pending = set() # pending async tasks
- cf_multithread.__init__(self, *args, **kw)
- def version_str(self):
- return 'aiohttp/%s; multithreaded' % aiohttp.__version__
- def main_thread(self):
- async def main_thread_async(self):
- """Main loop"""
- while True:
- if self.queue.empty():
- pass
- else:
- request = self.queue.get_nowait()
- if request is None: # Signal to stop
- return
- url, req_headers, use_proxy, queue = request
- task = asyncio.create_task(
- self.get_url_task(url, req_headers, use_proxy, queue))
- self.async_pending.add(task)
- if self.async_pending:
- done, async_pending = await asyncio.wait(
- self.async_pending, timeout=self.timeout,
- return_when=asyncio.FIRST_COMPLETED)
- self.async_pending = async_pending
- for task in done:
- error, status, resp_headers, body, queue = task.result()
- queue.put_nowait((error, status, resp_headers, body))
- async def get_url_task(self, url, req_headers, use_proxy, queue):
- if url.startswith('ftp://'):
- error, body = await _get_ftp(
- url, timeout=self.timeout,
- )
- if error is not None:
- error = str(error)
- return error, None, None, None, queue
- return None, None, None, body, queue
- if use_proxy:
- proxy = self.proxy
- else:
- proxy = None
- error, status, resp_headers, body = await _get_http(
- url, req_headers, proxy=proxy,
- timeout=self.timeout,
- )
- return error, status, resp_headers, body, queue
+++ /dev/null
-"""Robot based on twisted and concurrent.futures,
-processes multiple URLs in parallel (multithreaded).
-This file is a part of Bookmarks database and Internet robot.
-__author__ = "Oleg Broytman <>"
-__copyright__ = "Copyright (C) 2024 PhiloSoft Design"
-__license__ = "GNU GPL"
-__all__ = ['robot_twisted']
-from urllib.parse import urlsplit
-from time import sleep
-from twisted import __version__
-from twisted.internet import reactor
-from twisted.internet.endpoints import TCP4ClientEndpoint
-from twisted.web.client import Agent, ProxyAgent, readBody
-from twisted.web.http_headers import Headers
-from Robots.base import encode_url
-from Robots.concurrent_futures import cf_multithread
-from twisted.internet import _sslverify
-_sslverify.platformTrust = lambda: None
-class robot_twisted(cf_multithread):
- def __init__(self, *args, **kw):
- cf_multithread.__init__(self, *args, **kw)
- self.executor.submit(, installSignalHandlers=False)
- def version_str(self):
- return super(cf_multithread, self).version_str() \
- + '; Twisted ' + __version__
- def cbRequest(self, response, queue, timeoutCall):
- if
- timeoutCall.cancel()
- d = readBody(response)
- d.addCallback(self.cbBody, response, queue)
- return d
- def cbBody(self, body, response, queue):
- queue.put_nowait(
- (None, response.code,
- {k.decode('ascii').title(): v[0].decode('ascii')
- for k, v in response.headers.getAllRawHeaders()},
- body)
- )
- def cbError(self, failure, queue, timeoutCall):
- if
- timeoutCall.cancel()
- queue.put_nowait(('Error: %s' % failure,
- None, None, None))
- def cancelTimeout(self, passthrough, timeoutCall):
- if
- timeoutCall.cancel()
- return passthrough
- def main_thread(self):
- """Main loop: create twisted agent and HTTP queries"""
- direct_agent = Agent(reactor, connectTimeout=self.timeout)
- if self.proxy and self.proxy.startswith('http'):
- proxy = urlsplit(self.proxy)
- endpoint = TCP4ClientEndpoint(
- reactor, proxy.hostname, proxy.port, timeout=self.timeout)
- proxy_agent = ProxyAgent(endpoint)
- while True:
- if self.queue.empty():
- pass
- else:
- request = self.queue.get_nowait()
- if request is None: # Signal to stop
- reactor.stop()
- return
- url, req_headers, use_proxy, queue = request
- try:
- url.encode('ascii')
- except UnicodeEncodeError:
- url = encode_url(url)
- req_headers = {k: [v] for k, v in req_headers.items()}
- if use_proxy:
- agent = proxy_agent
- else:
- agent = direct_agent
- try:
- d = agent.request(b'GET', url.encode('ascii'),
- Headers(req_headers))
- except Exception as e:
- queue.put_nowait(('Error: %s' % e,
- None, None, None))
- continue
- # Setup timeout watch
- timeoutCall = reactor.callLater(self.timeout, d.cancel)
- d.addBoth(self.cancelTimeout, timeoutCall)
- d.addCallback(self.cbRequest, queue, timeoutCall)
- d.addErrback(self.cbError, queue, timeoutCall)
- sleep(0.1)
def __init__(self, *args, **kw):
self.concurrent_class = concurrent.futures.ProcessPoolExecutor
concurrent_futures.__init__(self, *args, **kw)
-class cf_multithread(concurrent_futures):
- def __init__(self, *args, **kw):
- self.concurrent_class = concurrent.futures.ThreadPoolExecutor
- concurrent_futures.__init__(self, *args, **kw)
- # Rename self.log, create one log_lines list per URL (thread)
- self.file_log = self.log
- del self.log
- self.local = threading.local()
- # Queue to pass requests to the main loop
- self.queue = Queue()
- # Main loop
- self.executor.submit(self.main_thread)
- def __getattr__(self, attr):
- if attr != 'log':
- raise AttributeError(attr)
- try:
- return self.local.log.append
- except AttributeError:
- return self.file_log
- def check_bkmk_task(self):
- return self.check_bkmk_thread
- def check_bkmk_thread(self, bookmark):
- self.local.log = log = []
- robot_base.check_bookmark(self, bookmark)
- return bookmark, log
- async def get(self, url, req_headers, use_proxy=False):
- # Queue to wait results
- queue = Queue(maxsize=1)
- self.queue.put((url, req_headers, use_proxy, queue))
- return queue.get()
- def wait(self):
- super(cf_multithread, self).wait()
- if not self.bookmarks and not self.pending:
- self.queue.put(None) # Signal to stop
. bkmk_db-venv/bin/activate &&
pip install --compile --upgrade setuptools \
beautifulsoup4 lxml m_lib.full \
- "requests[socks]" pycurl certifi \
- aiohttp aiohttp-socks "aioftp[socks]" \
- twisted pyOpenSSL service-identity
+ "requests[socks]" \
+ aiohttp aiohttp-socks "aioftp[socks]"
Version 6.2.0 (2024-??-??)
- Robot based on aiohttp and concurrent.futures,
- processes multiple URLs in parallel (multithreaded).
- Works slowly, the same way as curl.
- Default list of robots is multirequests,aio.
- Robot based on twisted and concurrent.futures,
- processes multiple URLs in parallel (multithreaded).
- Doesn't properly support proxies; has problems with HTTP proxy
- and doesn't support SOCKS5 proxy at all.
- Doesn't query FTP; requires more work.
Robots: Removed ftp_timeout.
Version 6.2.0 (2024-??-??)
- Robot based on aiohttp and concurrent.futures,
- processes multiple URLs in parallel (multithreaded).
- Works slowly, the same way as curl.
- Default list of robots is multirequests,aio.
- Robot based on twisted and concurrent.futures,
- processes multiple URLs in parallel (multithreaded).
- Doesn't properly support proxies; has problems with HTTP proxy
- and doesn't support SOCKS5 proxy at all.
- Doesn't query FTP; requires more work.
Robots: Removed ftp_timeout.
Version 6.1.0 (2024-09-08)
'html': ['beautifulsoup4', 'lxml'],
'requests': ['requests[socks]'],
- 'curl': ['pycurl', 'certifi'],
'aiohttp': ['aiohttp>=3', 'aiohttp-socks', 'aioftp[socks]'],
- 'twisted': ['twisted', 'pyOpenSSL', 'service-identity'],