import asyncio
import cgi
import logging
import os
from asyncio import Semaphore
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Optional, Set
from urllib.parse import urldefrag, urljoin, urlsplit, urlunsplit
from aiohttp import ClientSession, ClientTimeout
from bs4 import BeautifulSoup
__all__ = ('Crawler', 'crawl')
_log = logging.getLogger(__name__.split('.')[0])
class Crawler:
RATE_LIMIT = 50
MAX_WORKERS = os.cpu_count()
ALLOWED_SCHEMES = ('http', 'https')
def __init__(
self,
loop = None, # Не знаю какой тип указать,
*,
session: Optional[ClientSession] = None,
semaphore: Optional[Semaphore] = None,
executor: Optional[ThreadPoolExecutor] = None
):
self.loop = loop or asyncio.get_event_loop()
# DeprecationWarning: The object should be created from async function
self.session = session or ClientSession(loop=self.loop)
self.semaphore = semaphore or Semaphore(self.RATE_LIMIT, loop=self.loop)
self.executor = executor or ThreadPoolExecutor(max_workers=self.MAX_WORKERS)
async def crawl(
self,
url: str,
cb: Callable,
*,
depth: Optional[int] = 3,
headers: Optional[dict] = {},
timeout: Optional[float] = 15.0,
):
timeout = ClientTimeout(total=timeout)
url = self._normalize_url(url)
await self._fetch(url, cb, depth, headers, timeout, set())
async def _fetch(
self,
url: str,
cb: Callable,
depth: int,
headers: dict,
timeout: ClientTimeout,
seen: Set[str],
):
if url not in seen:
async with self.semaphore:
try:
_log.debug('url=%s depth=%s', url, depth)
async with self.session.get(
url,
headers=headers,
timeout=timeout,
# allow_redirects=False,
verify_ssl=False,
) as r:
# Нужно ли нормализовать url или и так схема и netloc в нижнем регистре?
url = self._normalize_url(str(r.url))
seen.add(url)
res = cb(url)
if asyncio.iscoroutine(res):
await res
# Тут может упасть, если у в заголовках ответа Content-Type не будет
ct, _ = cgi.parse_header(r.headers['Content-Type'])
if ct == 'text/html' and depth > 1:
html = await r.text()
links = await self.loop.run_in_executor(
self.executor,
self._extract_links,
html,
url
)
# _log.debug(links)
tasks = (
self._fetch(link, cb, depth - 1, headers, timeout, seen)
for link in links
)
await asyncio.gather(*tasks)
except Exception as e:
_log.warn(e)
def _extract_links(self, html: str, base_url: str) -> set:
soup = BeautifulSoup(html, 'lxml')
base = soup.find('base', href=True)
if base:
# Вроде как относительным может быть
base_url = urljoin(base_url, base['href'])
rv = set()
for link in soup.find_all('a', href=True):
url = urljoin(base_url, link['href'])
if urlsplit(url).scheme in self.ALLOWED_SCHEMES:
rv.add(self._normalize_url(url))
return rv
def _normalize_url(self, url: str) -> str:
scheme, netloc, path, query, _ = urlsplit(url)
return urlunsplit((scheme, netloc.lower(), path, query, ''))
async def crawl(*args, **kwargs):
await Crawler().crawl(*args, **kwargs)
if __name__ == '__main__':
from argparse import ArgumentParser
logging.basicConfig(level=logging.DEBUG)
parser = ArgumentParser()
parser.add_argument('url', help='URL')
parser.add_argument('-d', '--depth', default=3, type=int, help='Depth')
parser.add_argument('-H', '--header', default=[], action='append', help='Header: Value')
parser.add_argument('-t', '--timeout', default=15.0, type=float, help='Timeout')
args = parser.parse_args()
_log.debug(args.header)
headers = dict(map(str.strip, v.split(':')) for v in args.header)
asyncio.get_event_loop().run_until_complete(crawl(
args.url,
print,
depth=args.depth,
headers=headers,
))
Запускаю скрипт:
python crawler.py http://habr.com/ -H "User-Agent: Mozilla/5.0"
До второго уровня доходит и повисает. Что делать в душе не е*у. Есть ли те кто глубоко разбирается в asyncio?