# -*- coding: utf-8 -*-
# pylint: disable=ungrouped-imports
"""I2P Proxy
===============
The :mod:`darc.proxy.i2p` module contains the auxiliary functions
around managing and processing the I2P proxy.
"""
import base64
import getpass
import os
import platform
import re
import shlex
import signal
import subprocess # nosec: B404
from typing import TYPE_CHECKING, cast
import requests
import selenium.webdriver.common.proxy as selenium_proxy
from darc.const import CHECK, DARC_USER, DEBUG, PATH_DB
from darc.error import I2PBootstrapFailed, UnsupportedPlatform
from darc.link import parse_link, urljoin
from darc.logging import DEBUG as LOG_DEBUG
from darc.logging import ERROR as LOG_ERROR
from darc.logging import INFO as LOG_INFO
from darc.logging import VERBOSE as LOG_VERBOSE
from darc.logging import WARNING as LOG_WARNING
from darc.logging import logger
from darc.parse import _check, get_content_type
if TYPE_CHECKING:
from io import IO # type: ignore[attr-defined] # pylint: disable=no-name-in-module
from signal import Signals # pylint: disable=no-name-in-module
from subprocess import Popen # nosec: B404
from types import FrameType
from typing import List, NoReturn, Optional, Union
import darc.link as darc_link # Link
from darc._typing import File
# I2P args
I2P_ARGS = shlex.split(os.getenv('I2P_ARGS', ''))
# bootstrap wait
BS_WAIT = float(os.getenv('I2P_WAIT', '90'))
# I2P port
I2P_PORT = os.getenv('I2P_PORT', '4444')
# I2P bootstrap retry
I2P_RETRY = int(os.getenv('I2P_RETRY', '3'))
# proxy
I2P_REQUESTS_PROXY = {
# c.f. https://stackoverflow.com/a/42972942
'http': f'http://localhost:{I2P_PORT}',
'https': f'http://localhost:{I2P_PORT}',
}
I2P_SELENIUM_PROXY = selenium_proxy.Proxy()
I2P_SELENIUM_PROXY.proxyType = selenium_proxy.ProxyType.MANUAL
I2P_SELENIUM_PROXY.http_proxy = f'http://localhost:{I2P_PORT}'
I2P_SELENIUM_PROXY.ssl_proxy = f'http://localhost:{I2P_PORT}'
# manage I2P through darc?
_MNG_I2P = bool(int(os.getenv('DARC_I2P', '1')))
# I2P bootstrapped flag
_I2P_BS_FLAG = not _MNG_I2P
# I2P daemon process
_I2P_PROC = None
# I2P bootstrap args
_unsupported = False
if getpass.getuser() == 'root':
_system = platform.system()
if _system in ['Linux', 'Darwin']:
_I2P_ARGS = ['su', '-', DARC_USER, 'i2prouter', 'start']
else:
_unsupported = True
_I2P_ARGS = []
else:
_I2P_ARGS = ['i2prouter', 'start']
_I2P_ARGS.extend(I2P_ARGS)
if _unsupported:
if DEBUG:
logger.debug('-*- FREENET PROXY -*-')
logger.pline(LOG_ERROR, 'unsupported system: %s', platform.system())
logger.pline(LOG_DEBUG, logger.horizon)
else:
logger.plog(LOG_DEBUG, '-*- FREENET PROXY -*-', object=_I2P_ARGS)
# I2P link regular expression
I2P_REGEX = re.compile(r'.*\.i2p', re.IGNORECASE)
[docs]def launch_i2p() -> 'Popen[bytes]':
"""Launch I2P process.
See Also:
This function mocks the behaviour of :func:`stem.process.launch_tor`.
"""
i2p_process = None
try:
i2p_process = subprocess.Popen( # pylint: disable=consider-using-with # nosec
_I2P_ARGS, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
)
def timeout_handlet(signum: 'Optional[Union[int, Signals]]' = None,
frame: 'Optional[FrameType]' = None) -> 'NoReturn':
raise OSError('reached a %i second timeout without success' % BS_WAIT)
signal.signal(signal.SIGALRM, timeout_handlet)
signal.setitimer(signal.ITIMER_REAL, BS_WAIT)
while True:
init_line = cast(
'IO[bytes]', i2p_process.stdout
).readline().decode('utf-8', 'replace').strip()
logger.pline(LOG_VERBOSE, init_line)
if not init_line:
raise OSError('Process terminated: Timed out')
if 'running: PID:' in init_line:
return i2p_process
if 'I2P Service is already running.' in init_line:
return i2p_process
except BaseException:
if i2p_process is not None:
i2p_process.kill() # don't leave a lingering process
i2p_process.wait()
raise
finally:
signal.alarm(0) # stop alarm
[docs]def _i2p_bootstrap() -> None:
"""I2P bootstrap.
The bootstrap arguments are defined as :data:`~darc.proxy.i2p._I2P_ARGS`.
Raises:
subprocess.CalledProcessError: If the return code of :data:`~darc.proxy.i2p._I2P_PROC` is non-zero.
See Also:
* :func:`darc.proxy.i2p.i2p_bootstrap`
* :func:`darc.proxy.i2p.launch_i2p`
* :data:`darc.proxy.i2p.BS_WAIT`
* :data:`darc.proxy.i2p._I2P_BS_FLAG`
* :data:`darc.proxy.i2p._I2P_PROC`
"""
global _I2P_BS_FLAG, _I2P_PROC # pylint: disable=global-statement
# launch I2P process
_I2P_PROC = launch_i2p()
# update flag
_I2P_BS_FLAG = True
[docs]def i2p_bootstrap() -> None:
"""Bootstrap wrapper for I2P.
The function will bootstrap the I2P proxy. It will retry for
:data:`~darc.proxy.i2p.I2P_RETRY` times in case of failure.
Also, it will **NOT** re-bootstrap the proxy as is guaranteed by
:data:`~darc.proxy.i2p._I2P_BS_FLAG`.
Warns:
I2PBootstrapFailed: If failed to bootstrap I2P proxy.
Raises:
:exc:`UnsupportedPlatform`: If the system is not supported, i.e. not macOS or Linux.
See Also:
* :func:`darc.proxy.i2p._i2p_bootstrap`
* :data:`darc.proxy.i2p.I2P_RETRY`
* :data:`darc.proxy.i2p._I2P_BS_FLAG`
"""
if _unsupported:
raise UnsupportedPlatform(f'unsupported system: {platform.system()}')
# don't re-bootstrap
if _I2P_BS_FLAG:
return
logger.info('-*- I2P Bootstrap -*-')
for _ in range(I2P_RETRY+1):
try:
_i2p_bootstrap()
break
except Exception:
if DEBUG:
logger.ptb('[Error bootstraping I2P proxy]')
logger.pexc(LOG_WARNING, category=I2PBootstrapFailed, line='i2p_bootstrap()')
logger.pline(LOG_INFO, logger.horizon)
[docs]def get_hosts(link: 'darc_link.Link') -> 'Optional[File]':
"""Read ``hosts.txt``.
Args:
link: Link object to read ``hosts.txt``.
Returns:
* If ``hosts.txt`` exists, return the data from ``hosts.txt``.
* ``path`` -- relative path from ``hosts.txt`` to root of data storage
:data:`~darc.const.PATH_DB`, ``<proxy>/<scheme>/<hostname>/hosts.txt``
* ``data`` -- *base64* encoded content of ``hosts.txt``
* If not, return :data:`None`.
See Also:
* :func:`darc.submit.submit_new_host`
* :func:`darc.proxy.i2p.save_hosts`
"""
path = os.path.join(link.base, 'hosts.txt')
if not os.path.isfile(path):
return None
with open(path, 'rb') as file:
content = file.read()
return {
'path': os.path.relpath(path, PATH_DB),
'data': base64.b64encode(content).decode(),
}
[docs]def have_hosts(link: 'darc_link.Link') -> 'Optional[str]':
"""Check if ``hosts.txt`` already exists.
Args:
link: Link object to check if ``hosts.txt`` already exists.
Returns:
* If ``hosts.txt`` exists, return the path to ``hosts.txt``,
i.e. ``<root>/<proxy>/<scheme>/<hostname>/hosts.txt``.
* If not, return :data:`None`.
"""
# <proxy>/<scheme>/<host>/hosts.txt
path = os.path.join(link.base, 'hosts.txt')
return path if os.path.isfile(path) else None
[docs]def save_hosts(link: 'darc_link.Link', text: str) -> str:
"""Save ``hosts.txt``.
Args:
link: Link object of ``hosts.txt``.
text: Content of ``hosts.txt``.
Returns:
Saved path to ``hosts.txt``, i.e.
``<root>/<proxy>/<scheme>/<hostname>/hosts.txt``.
See Also:
* :func:`darc.save.sanitise`
"""
path = os.path.join(link.base, 'hosts.txt')
root = os.path.split(path)[0]
os.makedirs(root, exist_ok=True)
with open(path, 'w') as file:
print(f'# {link.url}', file=file)
file.write(text)
return path
[docs]def read_hosts(link: 'darc_link.Link', text: str, check: bool = CHECK) -> 'List[darc_link.Link]':
"""Read ``hosts.txt``.
Args:
link: Link object to fetch for its ``hosts.txt``.
text: Content of ``hosts.txt``.
check: If perform checks on extracted links,
default to :data:`~darc.const.CHECK`.
Returns:
List of links extracted.
"""
temp_list = []
for line in filter(None, map(lambda s: s.strip(), text.splitlines())):
if line.startswith('#'):
continue
host = line.split('=', maxsplit=1)[0]
if I2P_REGEX.fullmatch(host) is None:
continue
temp_list.append(parse_link(f'http://{host}', backref=link))
if check:
return _check(temp_list)
return temp_list
[docs]def fetch_hosts(link: 'darc_link.Link', force: bool = False) -> None:
"""Fetch ``hosts.txt``.
Args:
link: Link object to fetch for its ``hosts.txt``.
force: Force refetch ``hosts.txt``.
Returns:
Content of the ``hosts.txt`` file.
"""
if force:
logger.warning('[HOSTS] Force refetch %s', link.url)
hosts_path = None if force else have_hosts(link)
if hosts_path is not None:
logger.warning('[HOSTS] Cached %s', link.url) # pylint: disable=no-member
with open(hosts_path) as hosts_file:
hosts_text = hosts_file.read()
else:
from darc.requests import i2p_session # pylint: disable=import-outside-toplevel
hosts_link = parse_link(urljoin(link.url, '/hosts.txt'), backref=link)
logger.info('[HOSTS] Subscribing %s', hosts_link.url)
with i2p_session() as session:
try:
response = session.get(hosts_link.url)
except requests.RequestException:
logger.pexc(message=f'[HOSTS] Failed on {hosts_link.url}')
return
if not response.ok:
logger.error('[HOSTS] Failed on %s [%d]', hosts_link.url, response.status_code)
return
ct_type = get_content_type(response)
if ct_type not in ['text/text', 'text/plain']:
logger.error('[HOSTS] Unresolved content type on %s (%s)', hosts_link.url, ct_type)
return
hosts_text = response.text
save_hosts(hosts_link, hosts_text)
logger.info('[HOSTS] Subscribed %s', hosts_link.url)
from darc.db import save_requests # pylint: disable=import-outside-toplevel
# add link to queue
save_requests(read_hosts(link, hosts_text))