# -*- coding: utf-8 -*-
"""Source Parsing

The :mod:`darc.parse` module provides auxiliary functions
to read ``robots.txt``, sitemaps and HTML documents. It
also contains utility functions to check if the proxy type,
hostname and content type if in any of the black and white


import concurrent.futures
import os

import bs4
import magic
import requests
import stem.util.term

import darc.typing as typing
from darc._compat import RobotFileParser
                        PROXY_FALLBACK, PROXY_WHITE_LIST)
from darc.error import render_error
from import Link, parse_link, urljoin

[docs]def match_proxy(proxy: str) -> bool: """Check if proxy type in black list. Args: proxy: Proxy type to be checked. Returns: If ``proxy`` in black list. Note: If ``proxy`` is ``script``, then it will always return :data:`True`. See Also: * :data:`darc.const.PROXY_WHITE_LIST` * :data:`darc.const.PROXY_BLACK_LIST` * :data:`darc.const.PROXY_FALLBACK` """ if proxy == 'script': return True # any matching black list if proxy in PROXY_BLACK_LIST: return True # any matching white list if proxy in PROXY_WHITE_LIST: return False # fallback return PROXY_FALLBACK
[docs]def match_host(host: str) -> bool: """Check if hostname in black list. Args: host: Hostname to be checked. Returns: If ``host`` in black list. Note: If ``host`` is :data:`None`, then it will always return :data:`True`. See Also: * :data:`darc.const.LINK_WHITE_LIST` * :data:`darc.const.LINK_BLACK_LIST` * :data:`darc.const.LINK_FALLBACK` """ # invalid hostname if host is None: return True # any matching black list if any(pattern.fullmatch(host) is not None for pattern in LINK_BLACK_LIST): return True # any matching white list if any(pattern.fullmatch(host) is not None for pattern in LINK_WHITE_LIST): return False # fallback return LINK_FALLBACK
[docs]def match_mime(mime: str) -> bool: """Check if content type in black list. Args: mime: Content type to be checked. Returns: If ``mime`` in black list. See Also: * :data:`darc.const.MIME_WHITE_LIST` * :data:`darc.const.MIME_BLACK_LIST` * :data:`darc.const.MIME_FALLBACK` """ # any matching black list if any(pattern.fullmatch(mime) is not None for pattern in MIME_BLACK_LIST): return True # any matching white list if any(pattern.fullmatch(mime) is not None for pattern in MIME_WHITE_LIST): return False # fallback return MIME_FALLBACK
[docs]def check_robots(link: Link) -> bool: """Check if ``link`` is allowed in ``robots.txt``. Args: link: The link object to be checked. Returns: If ``link`` is allowed in ``robots.txt``. Note: The root path of a URL will always return :data:`True`. """ # bypass robots for root path if link.url_parse.path in ['', '/']: return True robots = os.path.join(link.base, 'robots.txt') if os.path.isfile(robots): rp = RobotFileParser() with open(robots) as file: rp.parse(file) from darc.requests import default_user_agent # pylint: disable=import-outside-toplevel return rp.can_fetch(default_user_agent(), link.url) return True
[docs]def _check_ng(temp_list: typing.List[Link]) -> typing.List[Link]: """Check content type of links through ``HEAD`` requests. Args: temp_list: List of links to be checked. Returns: List of links matches the requirements. See Also: * :func:`darc.parse.match_host` * :func:`darc.parse.match_proxy` * :func:`darc.parse.match_mime` """ from darc.crawl import request_session # pylint: disable=import-outside-toplevel session_map = dict() result_list = list() for link in temp_list: if match_host( continue if match_proxy(link.proxy): continue # get session session = session_map.get(link.proxy) if session is None: session = request_session(link, futures=True) session_map[link.proxy] = session result = session.head(link.url, allow_redirects=True) result_list.append(result) print(f'[HEAD] Checking content type from {link.url}') link_list = list() for result in concurrent.futures.as_completed(result_list): try: response: typing.Response = result.result() except requests.RequestException as error: if error.response is None: print(render_error(f'[HEAD] Checking failed <{error}>', stem.util.term.Color.RED)) # pylint: disable=no-member continue print(render_error(f'[HEAD] Failed on {error.response.url} <{error}>', stem.util.term.Color.RED)) # pylint: disable=no-member link_list.append(error.response.url) continue ct_type = get_content_type(response) print(f'[HEAD] Checked content type from {response.url} ({ct_type})') if match_mime(ct_type): continue temp_link = parse_link(response.request.url) link_list.append(temp_link) return link_list
[docs]def _check(temp_list: typing.List[Link]) -> typing.List[Link]: """Check hostname and proxy type of links. Args: temp_list: List of links to be checked. Returns: List of links matches the requirements. Note: If :data:`~darc.const.CHECK_NG` is :data:`True`, the function will directly call :func:`~darc.parse._check_ng` instead. See Also: * :func:`darc.parse.match_host` * :func:`darc.parse.match_proxy` """ if CHECK_NG: return _check_ng(temp_list) link_list = list() for link in temp_list: if match_host( continue if match_proxy(link.proxy): continue link_list.append(link) return link_list
[docs]def get_content_type(response: typing.Response) -> str: """Get content type from ``response``. Args: response (:class:`requests.Response`): Response object. Returns: The content type from ``response``. Note: If the ``Content-Type`` header is not defined in ``response``, the function will utilise |magic|_ to detect its content type. .. |Response| replace:: ``requests.Response``. .. _Response: .. |magic| replace:: ``magic`` .. _magic: """ ct_type = response.headers.get('Content-Type') if ct_type is None: try: ct_type = magic.detect_from_content(response.content).mime_type except Exception: ct_type = '(null)' return ct_type.casefold().split(';', maxsplit=1)[0].strip()