feat: add adblock functionality with support for external blocklists

This commit is contained in:
Abolfazl
2026-05-05 07:27:49 +03:30
parent bd3d1943b0
commit 64363ed531
5 changed files with 324 additions and 2 deletions
+265
View File
@@ -0,0 +1,265 @@
"""
Adblock hosts list loader.
Downloads and caches domain blocklists at startup, then merges them into the
proxy's block-host rules. Supports two common list formats:
• Bare domain per line — used by PersianBlocker Hosts files
• Standard hosts format — "0.0.0.0 domain.com" / "127.0.0.1 domain.com"
Comments (#), wildcards (analytics-*.example.com), and raw IP addresses are
skipped automatically.
Usage from proxy_server.py:
from core.adblock import load_all, refresh_all
# Synchronous load at startup (uses disk cache if available):
domains = load_all(config["adblock_lists"])
# Async background refresh (re-downloads stale lists):
await refresh_all(config["adblock_lists"], callback=update_fn)
"""
import asyncio
import hashlib
import ipaddress
import logging
import pathlib
import re
import time
import urllib.request
log = logging.getLogger("Adblock")
# Re-download a list when the cached copy is older than this (seconds).
_DEFAULT_MAX_AGE = 86_400 # 24 hours
_DOWNLOAD_TIMEOUT = 30 # seconds per HTTP request
# Cache sits next to the project root (same dir as main.py / config.json).
_CACHE_DIR = pathlib.Path("adblock_cache")
# Patterns used during line parsing
_IP_RE = re.compile(
r"^(?:\d{1,3}\.){3}\d{1,3}$" # IPv4
r"|^[0-9a-fA-F:]{2,39}$" # IPv6 (rough match)
)
_WILDCARD_RE = re.compile(r"[*?]")
# Minimal domain sanity check: must contain at least one dot, only ASCII
# label characters, and no leading/trailing hyphens in any label.
_DOMAIN_RE = re.compile(
r"^(?:[a-z0-9](?:[a-z0-9\-]{0,61}[a-z0-9])?\.)+[a-z]{2,}$"
)
_SKIP_NAMES = frozenset({
"localhost", "local", "broadcasthost",
"localhost.localdomain", "ip6-localhost",
"ip6-loopback",
})
# These addresses appear in standard hosts files as the "null" target —
# they are address fields, not domain names.
_HOSTS_PREFIXES = frozenset({"0.0.0.0", "127.0.0.1", "::1", "::0"})
# ── List parsing ──────────────────────────────────────────────────────────────
def parse_hosts_text(text: str) -> list[str]:
"""Parse a hosts-format (or bare-domain-per-line) text.
Returns a deduplicated list of valid, lowercase domain strings.
Wildcards, raw IPs, comments, and the reserved names above are dropped.
"""
seen: set[str] = set()
domains: list[str] = []
for raw_line in text.splitlines():
line = raw_line.strip()
# Skip blank lines and full-line comments
if not line or line.startswith("#"):
continue
# Strip inline comments
comment_pos = line.find(" #")
if comment_pos != -1:
line = line[:comment_pos].strip()
parts = line.split()
if len(parts) == 2 and parts[0] in _HOSTS_PREFIXES:
# Standard hosts format: "0.0.0.0 domain.com"
domain = parts[1].lower().rstrip(".")
elif len(parts) == 1:
# Bare domain format
domain = parts[0].lower().rstrip(".")
else:
# Multiple words with unknown prefix, or empty after stripping — skip
continue
# Skip wildcards (analytics-*.example.com) — can't match them safely
if _WILDCARD_RE.search(domain):
continue
# Skip raw IP addresses
if _IP_RE.match(domain):
continue
try:
ipaddress.ip_address(domain)
continue
except ValueError:
pass
# Skip reserved names
if domain in _SKIP_NAMES:
continue
# Basic domain structure check: at least one dot, valid label chars
if not _DOMAIN_RE.match(domain):
continue
if domain not in seen:
seen.add(domain)
domains.append(domain)
return domains
# ── Disk cache helpers ────────────────────────────────────────────────────────
def _cache_path(url: str) -> pathlib.Path:
h = hashlib.sha1(url.encode()).hexdigest()[:16]
return _CACHE_DIR / f"{h}.txt"
def _cache_is_stale(url: str, max_age: int) -> bool:
path = _cache_path(url)
if not path.exists():
return True
try:
return (time.time() - path.stat().st_mtime) > max_age
except OSError:
return True
def _read_cache(url: str) -> list[str] | None:
path = _cache_path(url)
try:
text = path.read_text(encoding="utf-8", errors="replace")
return parse_hosts_text(text)
except OSError:
return None
def _write_cache(url: str, text: str) -> None:
try:
_CACHE_DIR.mkdir(parents=True, exist_ok=True)
_cache_path(url).write_text(text, encoding="utf-8")
except OSError as exc:
log.warning("Adblock: cache write failed: %s", exc)
# ── Network fetch ─────────────────────────────────────────────────────────────
def _fetch(url: str) -> str | None:
"""Blocking HTTP GET — intended to run inside asyncio.to_thread()."""
try:
req = urllib.request.Request(
url,
headers={"User-Agent": "MasterHttpRelayVPN/adblock-updater"},
)
with urllib.request.urlopen(req, timeout=_DOWNLOAD_TIMEOUT) as resp:
return resp.read().decode("utf-8", errors="replace")
except Exception as exc:
log.warning("Adblock: download failed (%s): %s", url, exc)
return None
# ── Public API ────────────────────────────────────────────────────────────────
def load_all(urls: list[str], max_age: int = _DEFAULT_MAX_AGE) -> list[str]:
"""Synchronously load all lists. Called once at proxy startup.
Strategy:
• If a cached copy exists (even if stale), return it immediately so
startup is never blocked by network I/O.
• If there is NO cached copy for a URL, download it now (one-time,
first-run penalty) so the adblock is active from the first request.
Stale caches will be refreshed later by ``refresh_all()``.
"""
all_domains: list[str] = []
for url in urls:
url = url.strip()
if not url:
continue
cached = _read_cache(url)
if cached is not None:
log.info(
"Adblock: %d domains loaded from cache (%s)",
len(cached),
url.split("/")[-1],
)
all_domains.extend(cached)
else:
log.info("Adblock: no cache for %s — downloading...", url.split("/")[-1])
text = _fetch(url)
if text:
_write_cache(url, text)
domains = parse_hosts_text(text)
log.info(
"Adblock: downloaded %d domains from %s",
len(domains),
url.split("/")[-1],
)
all_domains.extend(domains)
else:
log.warning("Adblock: could not load %s — adblock disabled for this list", url)
return all_domains
async def refresh_all(
urls: list[str],
max_age: int = _DEFAULT_MAX_AGE,
callback=None,
) -> list[str]:
"""Async background refresh. Re-downloads lists whose cache is stale.
``callback(domains: list[str])`` is called on the asyncio event loop
after any list is successfully updated, letting the proxy hot-swap the
active block set without restarting.
"""
all_domains: list[str] = []
changed = False
for url in urls:
url = url.strip()
if not url:
continue
if not _cache_is_stale(url, max_age):
cached = _read_cache(url) or []
all_domains.extend(cached)
continue
log.info("Adblock: refreshing %s ...", url.split("/")[-1])
text = await asyncio.to_thread(_fetch, url)
if text:
await asyncio.to_thread(_write_cache, url, text)
domains = await asyncio.to_thread(parse_hosts_text, text)
log.info(
"Adblock: refreshed %d domains from %s",
len(domains),
url.split("/")[-1],
)
all_domains.extend(domains)
changed = True
else:
# Keep using stale cache rather than losing protection
cached = _read_cache(url) or []
all_domains.extend(cached)
if changed and callback is not None:
callback(all_domains)
return all_domains