# -*- coding: utf-8 -*-
"""Link Database
===================
The :mod:`darc` project utilises file system based database
to provide tele-process communication.
.. note::
In its first implementation, the :mod:`darc` project used
|Queue|_ to support such communication. However, as noticed
when runtime, the |Queue| object will be much affected by
the lack of memory.
.. |Queue| replace:: ``multiprocessing.Queue``
.. _Queue: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue
There will be two databases, both locate at root of the
data storage path :data:`~darc.const.PATH_DB`:
* the |requests|_ database -- ``queue_requests.txt``
* the |selenium|_ database -- ``queue_selenium.txt``
At runtime, after reading such database, :mod:`darc`
will keep a backup of the database with ``.tmp`` suffix
to its file extension.
"""
import math
import os
import pickle
import pprint
import shutil
import time
import stem.util.term
import darc.typing as typing
from darc.const import CHECK
from darc.const import REDIS as redis
from darc.const import TIME_CACHE, VERBOSE
from darc.error import render_error
from darc.link import Link
from darc.parse import _check, match_proxy
from darc.proxy.freenet import _FREENET_BS_FLAG, freenet_bootstrap, has_freenet
from darc.proxy.i2p import _I2P_BS_FLAG, has_i2p, i2p_bootstrap
from darc.proxy.tor import _TOR_BS_FLAG, has_tor, tor_bootstrap
from darc.proxy.zeronet import _ZERONET_BS_FLAG, has_zeronet, zeronet_bootstrap
# bulk size
BULK_SIZE = int(os.getenv('DARC_BULK_SIZE', '100'))
# max pool
MAX_POOL = float(os.getenv('DARC_MAX_POOL', '1_000'))
if math.isfinite(MAX_POOL):
MAX_POOL = math.floor(MAX_POOL)
[docs]def save_requests(entries: typing.Iterable[Link], single: bool = False,
score=None, nx=False, xx=False):
"""Save link to the |requests|_ database.
Args:
entries: Links to be added to the |requests|_ database.
It can be either an *iterable* of links, or a single
link string (if ``single`` set as ``True``).
single: Indicate if ``entries`` is an *iterable* of links
or a single link string.
score: Score to for the Redis sorted set.
nx: Forces ``ZADD`` to only create new elements and not to
update scores for elements that already exist.
xx: Forces ``ZADD`` to only update scores of elements that
already exist. New elements will not be added.
"""
if not entries:
return
if score is None:
score = time.time()
if not single:
for i in range(0, len(entries), BULK_SIZE):
mapping = {
pickle.dumps(link): score for link in entries[i:i + BULK_SIZE]
}
redis.zadd('queue_requests', mapping, nx=nx, xx=xx)
return
mapping = {
pickle.dumps(entries): score,
}
redis.zadd('queue_requests', mapping, nx=nx, xx=xx)
[docs]def save_selenium(entries: typing.Iterable[Link], single: bool = False,
score=None, nx=False, xx=False):
"""Save link to the |selenium|_ database.
Args:
entries: Links to be added to the |selenium|_ database.
It can be either an *iterable* of links, or a single
link string (if ``single`` set as ``True``).
single: Indicate if ``entries`` is an *iterable* of links
or a single link string.
score: Score to for the Redis sorted set.
nx: Forces ``ZADD`` to only create new elements and not to
update scores for elements that already exist.
xx: Forces ``ZADD`` to only update scores of elements that
already exist. New elements will not be added.
"""
if not entries:
return
if score is None:
score = time.time()
if not single:
for i in range(0, len(entries), BULK_SIZE):
mapping = {
pickle.dumps(link): score for link in entries[i:i + BULK_SIZE]
}
redis.zadd('queue_selenium', mapping, nx=nx, xx=xx)
return
mapping = {
pickle.dumps(entries): score,
}
redis.zadd('queue_selenium', mapping, nx=nx, xx=xx)
[docs]def load_requests(check: bool = CHECK) -> typing.List[Link]:
"""Load link from the |requests|_ database.
Args:
check: If perform checks on loaded links,
default to :data:`~darc.const.CHECK`.
Returns:
List of loaded links from the |requests|_ database.
Note:
At runtime, the function will load links with maximum number
at :data:`~darc.db.MAX_POOL` to limit the memory usage.
"""
now = time.time()
if TIME_CACHE is None:
max_score = now
else:
sec_delta = TIME_CACHE.total_seconds()
max_score = now - sec_delta
with redis.lock('lock_queue_requests'):
link_pool = [pickle.loads(link) for link in redis.zrangebyscore('queue_requests', min=0, max=max_score,
start=0, num=MAX_POOL)]
if TIME_CACHE is not None:
new_score = now + sec_delta
save_requests(link_pool, score=new_score) # force update records
if check:
link_pool = _check(link_pool)
if not match_proxy('tor') and not _TOR_BS_FLAG and has_tor(link_pool):
tor_bootstrap()
if not match_proxy('i2p') and not _I2P_BS_FLAG and has_i2p(link_pool):
i2p_bootstrap()
if not match_proxy('zeronet') and not _ZERONET_BS_FLAG and has_zeronet(link_pool):
zeronet_bootstrap()
if not match_proxy('freenet') and not _FREENET_BS_FLAG and has_freenet(link_pool):
freenet_bootstrap()
if VERBOSE:
print(stem.util.term.format('-*- [REQUESTS] LINK POOL -*-',
stem.util.term.Color.MAGENTA)) # pylint: disable=no-member
print(render_error(pprint.pformat(sorted(link.url for link in link_pool)),
stem.util.term.Color.MAGENTA)) # pylint: disable=no-member
print(stem.util.term.format('-' * shutil.get_terminal_size().columns,
stem.util.term.Color.MAGENTA)) # pylint: disable=no-member
return link_pool
[docs]def load_selenium(check: bool = CHECK) -> typing.List[Link]:
"""Load link from the |selenium|_ database.
Args:
check: If perform checks on loaded links,
default to :data:`~darc.const.CHECK`.
Returns:
List of loaded links from the |selenium|_ database.
Note:
At runtime, the function will load links with maximum number
at :data:`~darc.db.MAX_POOL` to limit the memory usage.
"""
now = time.time()
if TIME_CACHE is None:
max_score = now
else:
sec_delta = TIME_CACHE.total_seconds()
max_score = now - sec_delta
with redis.lock('lock_queue_selenium'):
link_pool = [pickle.loads(link) for link in redis.zrangebyscore('queue_selenium', min=0, max=max_score,
start=0, num=MAX_POOL)]
if TIME_CACHE is not None:
new_score = now + sec_delta
save_selenium(link_pool, score=new_score) # force update records
if check:
link_pool = _check(link_pool)
if not match_proxy('tor') and not _TOR_BS_FLAG and has_tor(link_pool):
tor_bootstrap()
if not match_proxy('i2p') and not _I2P_BS_FLAG and has_i2p(link_pool):
i2p_bootstrap()
if not match_proxy('zeronet') and not _ZERONET_BS_FLAG and has_zeronet(link_pool):
zeronet_bootstrap()
if not match_proxy('freenet') and not _FREENET_BS_FLAG and has_freenet(link_pool):
freenet_bootstrap()
if VERBOSE:
print(stem.util.term.format('-*- [SELENIUM] LINK POOL -*-',
stem.util.term.Color.MAGENTA)) # pylint: disable=no-member
print(render_error(pprint.pformat(sorted(link.url for link in link_pool)),
stem.util.term.Color.MAGENTA)) # pylint: disable=no-member
print(stem.util.term.format('-' * shutil.get_terminal_size().columns,
stem.util.term.Color.MAGENTA)) # pylint: disable=no-member
return link_pool