Enhance parallel relay functionality and SNI rotation in domain fronting

This commit is contained in:
Abolfazl
2026-04-22 19:13:26 +03:30
parent f7fc567351
commit edd9af03a3
4 changed files with 401 additions and 33 deletions
+1
View File
@@ -10,6 +10,7 @@
"socks5_port": 1080,
"log_level": "INFO",
"verify_ssl": true,
"parallel_relay": 1,
"block_hosts": [],
"bypass_hosts": [
"localhost",
+24
View File
@@ -45,6 +45,30 @@ BATCH_WINDOW_MACRO = 0.050 # 50 ms
BATCH_MAX = 50
# ── Fan-out relay (parallel Apps Script instances) ────────────────────────
# How long to ignore a script ID after it fails or is unreasonably slow.
SCRIPT_BLACKLIST_TTL = 600.0 # 10 minutes
# ── SNI rotation pool ─────────────────────────────────────────────────────
# Google-owned SNIs that share the same edge IPs as www.google.com.
# When `front_domain` is a Google property, we rotate through this pool on
# each new outbound TLS handshake so DPI systems don't see a constant
# "always www.google.com" pattern from the client.
FRONT_SNI_POOL_GOOGLE: tuple[str, ...] = (
"www.google.com",
"mail.google.com",
"drive.google.com",
"docs.google.com",
"calendar.google.com",
)
# ── Per-host stats ────────────────────────────────────────────────────────
STATS_LOG_INTERVAL = 300.0 # seconds — how often to log per-host totals
STATS_LOG_TOP_N = 10 # how many hosts to include in the log
# ── Direct Google tunnel allow / exclude ──────────────────────────────────
# Google web-apps whose real origin must go through the Apps Script relay
# because direct SNI tunneling to them does not work reliably behind DPI.
+346 -14
View File
@@ -14,8 +14,10 @@ import hashlib
import json
import logging
import re
import socket
import ssl
import time
from dataclasses import dataclass
from urllib.parse import urlparse
import codec
@@ -24,12 +26,16 @@ from constants import (
BATCH_WINDOW_MACRO,
BATCH_WINDOW_MICRO,
CONN_TTL,
FRONT_SNI_POOL_GOOGLE,
POOL_MAX,
POOL_MIN_IDLE,
RELAY_TIMEOUT,
SCRIPT_BLACKLIST_TTL,
SEMAPHORE_MAX,
STATEFUL_HEADER_NAMES,
STATIC_EXTS,
STATS_LOG_INTERVAL,
STATS_LOG_TOP_N,
TLS_CONNECT_TIMEOUT,
WARM_POOL_COUNT,
)
@@ -37,12 +43,56 @@ from constants import (
log = logging.getLogger("Fronter")
@dataclass
class HostStat:
"""Per-host traffic accounting — useful for profiling slow / heavy sites."""
requests: int = 0
cache_hits: int = 0
bytes: int = 0
total_latency_ns: int = 0
errors: int = 0
def _build_sni_pool(front_domain: str, overrides: list | None) -> list[str]:
"""Build the list of SNIs to rotate through on new outbound TLS handshakes.
Priority:
1. Explicit `front_domains` list in config (overrides).
2. If `front_domain` is a Google property, use FRONT_SNI_POOL_GOOGLE
(all share the same Google edge IP, so rotation is invisible to
the relay but breaks DPI's "always www.google.com" heuristic).
3. Fall back to the single configured `front_domain`.
"""
if overrides:
seen: set[str] = set()
out: list[str] = []
for item in overrides:
host = str(item).strip().lower().rstrip(".")
if host and host not in seen:
seen.add(host)
out.append(host)
if out:
return out
fd = (front_domain or "").lower().rstrip(".")
if fd.endswith(".google.com") or fd == "google.com":
# Ensure the configured front_domain is first (stable default).
pool = [fd] + [h for h in FRONT_SNI_POOL_GOOGLE if h != fd]
return pool
return [fd] if fd else ["www.google.com"]
class DomainFronter:
_STATIC_EXTS = STATIC_EXTS
def __init__(self, config: dict):
self.connect_host = config.get("google_ip", "216.239.38.120")
self.sni_host = config.get("front_domain", "www.google.com")
# SNI rotation pool — rotated per new outbound TLS connection so
# DPI systems can't fingerprint traffic as "always one SNI".
self._sni_hosts = _build_sni_pool(
self.sni_host, config.get("front_domains"),
)
self._sni_idx = 0
self.http_host = "script.google.com"
# Multi-script round-robin for higher throughput
script = config.get("script_ids") or config.get("script_id")
@@ -51,6 +101,23 @@ class DomainFronter:
self.script_id = self._script_ids[0] # backward compat / logging
self._dev_available = False # True if /dev endpoint works (no redirect, ~400ms faster)
# Fan-out parallel relay: fire N Apps Script instances concurrently,
# keep the first successful response, cancel the rest. Script IDs
# that fail or time out get blacklisted for SCRIPT_BLACKLIST_TTL so
# a single slow container stops poisoning tail latency.
try:
self._parallel_relay = int(config.get("parallel_relay", 1))
except (TypeError, ValueError):
self._parallel_relay = 1
self._parallel_relay = max(1, min(self._parallel_relay,
len(self._script_ids)))
self._sid_blacklist: dict[str, float] = {}
self._blacklist_ttl = SCRIPT_BLACKLIST_TTL
# Per-host stats (requests, cache hits, bytes, cumulative latency).
self._per_site: dict[str, HostStat] = {}
self._stats_task: asyncio.Task | None = None
self.auth_key = config.get("auth_key", "")
self.verify_ssl = config.get("verify_ssl", True)
@@ -86,13 +153,21 @@ class DomainFronter:
from h2_transport import H2Transport, H2_AVAILABLE
if H2_AVAILABLE:
self._h2 = H2Transport(
self.connect_host, self.sni_host, self.verify_ssl
self.connect_host, self.sni_host, self.verify_ssl,
sni_hosts=self._sni_hosts,
)
log.info("HTTP/2 multiplexing available — "
"all requests will share one connection")
except ImportError:
pass
if len(self._sni_hosts) > 1:
log.info("SNI rotation pool (%d): %s",
len(self._sni_hosts), ", ".join(self._sni_hosts))
if self._parallel_relay > 1:
log.info("Fan-out relay: %d parallel Apps Script instances per request",
self._parallel_relay)
# Capability log for content encodings.
log.info("Response codecs: %s", codec.supported_encodings())
@@ -108,15 +183,35 @@ class DomainFronter:
async def _open(self):
"""Open a TLS connection to the CDN.
The *server_hostname* parameter sets the **TLS SNI** extension.
DPI systems see only this value.
- TCP_NODELAY is set on the underlying socket so small H2/H1 writes
aren't held back by Nagle's algorithm (up to ~40 ms per batch).
- The *server_hostname* parameter sets the **TLS SNI** extension;
we rotate across `self._sni_hosts` so DPI can't fingerprint
"always www.google.com" from the client side.
"""
loop = asyncio.get_event_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setblocking(False)
try:
await loop.sock_connect(sock, (self.connect_host, 443))
return await asyncio.open_connection(
self.connect_host,
443,
sock=sock,
ssl=self._ssl_ctx(),
server_hostname=self.sni_host,
server_hostname=self._next_sni(),
)
except Exception:
try:
sock.close()
except Exception:
pass
raise
def _next_sni(self) -> str:
"""Round-robin the next SNI from the rotation pool."""
sni = self._sni_hosts[self._sni_idx % len(self._sni_hosts)]
self._sni_idx += 1
return sni
async def _acquire(self):
"""Get a healthy TLS connection from pool (TTL-checked) or open new."""
@@ -160,11 +255,69 @@ class DomainFronter:
pass
def _next_script_id(self) -> str:
"""Round-robin across script IDs for load distribution."""
sid = self._script_ids[self._script_idx % len(self._script_ids)]
"""Round-robin across script IDs for load distribution.
Skips script IDs currently in the short-term blacklist (failing
or slow) unless *all* are blacklisted, in which case we fall back
to plain round-robin so traffic can still flow.
"""
n = len(self._script_ids)
for _ in range(n):
sid = self._script_ids[self._script_idx % n]
self._script_idx += 1
if not self._is_sid_blacklisted(sid):
return sid
# All blacklisted — clear expired entries and fall back.
self._prune_blacklist(force=True)
sid = self._script_ids[self._script_idx % n]
self._script_idx += 1
return sid
def _is_sid_blacklisted(self, sid: str) -> bool:
until = self._sid_blacklist.get(sid, 0.0)
if until and until > time.time():
return True
if until:
self._sid_blacklist.pop(sid, None)
return False
def _blacklist_sid(self, sid: str, reason: str = "") -> None:
"""Blacklist a script ID for SCRIPT_BLACKLIST_TTL seconds."""
if len(self._script_ids) <= 1:
return # Nothing to fall back to — blacklist would be pointless.
self._sid_blacklist[sid] = time.time() + self._blacklist_ttl
log.warning("Blacklisted script %s for %ds%s",
sid[-8:] if len(sid) > 8 else sid,
int(self._blacklist_ttl),
f" ({reason})" if reason else "")
def _prune_blacklist(self, force: bool = False) -> None:
now = time.time()
for sid, until in list(self._sid_blacklist.items()):
if force or until <= now:
self._sid_blacklist.pop(sid, None)
def _pick_fanout_sids(self, key: str | None) -> list[str]:
"""Pick up to `parallel_relay` distinct non-blacklisted script IDs.
The first ID is the stable per-host choice (same as single-shot
routing); the rest are filled from the remaining pool. This keeps
session-sensitive hosts pinned to one script while still racing
extras for lower tail latency.
"""
if self._parallel_relay <= 1 or len(self._script_ids) <= 1:
return [self._script_id_for_key(key)]
primary = self._script_id_for_key(key)
picked = [primary]
others = [s for s in self._script_ids
if s != primary and not self._is_sid_blacklisted(s)]
# Round-robin-ish selection from `others`
for sid in others:
if len(picked) >= self._parallel_relay:
break
picked.append(sid)
return picked
@staticmethod
def _host_key(url_or_host: str | None) -> str:
"""Return a stable routing key for a URL or host string."""
@@ -174,6 +327,76 @@ class DomainFronter:
host = parsed.hostname or url_or_host
return host.lower().rstrip(".")
# ── Per-host stats ────────────────────────────────────────────
def _record_site(self, url: str, bytes_: int, latency_ns: int,
errored: bool) -> None:
host = self._host_key(url)
if not host:
return
stat = self._per_site.get(host)
if stat is None:
stat = HostStat()
self._per_site[host] = stat
stat.requests += 1
stat.bytes += max(0, int(bytes_))
stat.total_latency_ns += max(0, int(latency_ns))
if errored:
stat.errors += 1
def stats_snapshot(self) -> dict:
"""Return a point-in-time snapshot of traffic + script health."""
per_site = []
for host, s in self._per_site.items():
avg_ms = (s.total_latency_ns / s.requests / 1e6) if s.requests else 0.0
per_site.append({
"host": host,
"requests": s.requests,
"errors": s.errors,
"bytes": s.bytes,
"avg_ms": round(avg_ms, 1),
})
per_site.sort(key=lambda x: x["bytes"], reverse=True)
now = time.time()
blacklisted = [
{"sid": sid[-12:] if len(sid) > 12 else sid,
"expires_in_s": int(max(0, until - now))}
for sid, until in self._sid_blacklist.items() if until > now
]
return {
"per_site": per_site,
"blacklisted_scripts": blacklisted,
"sni_rotation": list(self._sni_hosts),
"parallel_relay": self._parallel_relay,
}
async def _stats_logger(self):
"""Periodically log top hosts by bytes. DEBUG-level, low overhead."""
interval = STATS_LOG_INTERVAL
top_n = STATS_LOG_TOP_N
while True:
try:
await asyncio.sleep(interval)
if not log.isEnabledFor(logging.DEBUG) or not self._per_site:
continue
snap = self.stats_snapshot()
top = snap["per_site"][:top_n]
log.debug("── Per-host stats (top %d by bytes) ──", len(top))
for row in top:
log.debug(
" %-40s %5d req %2d err %8d KB avg %7.1f ms",
row["host"][:40], row["requests"], row["errors"],
row["bytes"] // 1024, row["avg_ms"],
)
if snap["blacklisted_scripts"]:
log.debug(" blacklisted scripts: %s",
", ".join(f"{b['sid']} ({b['expires_in_s']}s)"
for b in snap["blacklisted_scripts"]))
except asyncio.CancelledError:
break
except Exception as e:
log.debug("Stats logger error: %s", e)
def _script_id_for_key(self, key: str | None = None) -> str:
"""Pick a stable Apps Script ID for a host or fallback to round-robin.
@@ -181,20 +404,31 @@ class DomainFronter:
host reduces IP/session churn for sites that are sensitive to endpoint
changes. If no key is available, we keep the older round-robin fallback
so warmup/keepalive traffic still distributes normally.
Blacklisted IDs are skipped by probing forward in the list until a
healthy one is found; if none, the stable pick is returned anyway.
"""
if len(self._script_ids) == 1:
return self._script_ids[0]
if not key:
return self._next_script_id()
digest = hashlib.sha1(key.encode("utf-8")).digest()
idx = int.from_bytes(digest[:4], "big") % len(self._script_ids)
return self._script_ids[idx]
base = int.from_bytes(digest[:4], "big") % len(self._script_ids)
n = len(self._script_ids)
for offset in range(n):
sid = self._script_ids[(base + offset) % n]
if not self._is_sid_blacklisted(sid):
return sid
return self._script_ids[base]
def _exec_path(self, url_or_host: str | None = None) -> str:
"""Get the Apps Script endpoint path (/dev or /exec)."""
sid = self._script_id_for_key(self._host_key(url_or_host))
return f"/macros/s/{sid}/{'dev' if self._dev_available else 'exec'}"
return self._exec_path_for_sid(sid)
def _exec_path_for_sid(self, sid: str) -> str:
"""Build the /macros/s/<sid>/(dev|exec) path for a specific script ID."""
return f"/macros/s/{sid}/{'dev' if self._dev_available else 'exec'}"
async def _flush_pool(self):
"""Close all pooled connections (they may be stale after errors)."""
async with self._pool_lock:
@@ -271,6 +505,9 @@ class DomainFronter:
# Start continuous pool maintenance
if self._maintenance_task is None:
self._maintenance_task = self._spawn(self._pool_maintenance())
# Periodic per-host stats logger (opt-in via log level)
if self._stats_task is None:
self._stats_task = self._spawn(self._stats_logger())
# Start H2 connection (runs alongside H1 pool)
if self._h2:
self._spawn(self._h2_connect_and_warm())
@@ -424,10 +661,15 @@ class DomainFronter:
payload = self._build_payload(method, url, headers, body)
t0 = time.perf_counter()
errored = False
result: bytes = b""
try:
# Stateful/browser-navigation requests should preserve exact ordering
# and header context; batching/coalescing is reserved for static fetches.
if self._is_stateful_request(method, url, headers, body):
return await self._relay_with_retry(payload)
result = await self._relay_with_retry(payload)
return result
# Coalesce concurrent GETs for the same URL.
# CRITICAL: do NOT coalesce when a Range header is present —
@@ -439,9 +681,17 @@ class DomainFronter:
has_range = True
break
if method == "GET" and not body and not has_range:
return await self._coalesced_submit(url, payload)
result = await self._coalesced_submit(url, payload)
return result
return await self._batch_submit(payload)
result = await self._batch_submit(payload)
return result
except Exception:
errored = True
raise
finally:
latency_ns = int((time.perf_counter() - t0) * 1e9)
self._record_site(url, len(result), latency_ns, errored)
async def _coalesced_submit(self, url: str, payload: dict) -> bytes:
"""Dedup concurrent requests for the same URL (no Range header).
@@ -789,6 +1039,20 @@ class DomainFronter:
async def _relay_with_retry(self, payload: dict) -> bytes:
"""Single relay with one retry on failure. Uses H2 if available."""
# Fan-out: race N Apps Script instances when enabled and H2 is up.
# Cuts tail latency when one container is slow/cold. Only kicks in
# if multiple script IDs are configured and the H2 transport is live.
if (self._parallel_relay > 1
and len(self._script_ids) > 1
and self._h2 and self._h2.is_connected):
try:
return await asyncio.wait_for(
self._relay_fanout(payload), timeout=RELAY_TIMEOUT,
)
except Exception as e:
log.debug("Fan-out relay failed (%s), falling back", e)
# fall through to single-path logic below
# Try HTTP/2 first — much faster (multiplexed, no pool checkout)
if self._h2 and self._h2.is_connected:
for attempt in range(2):
@@ -822,6 +1086,53 @@ class DomainFronter:
else:
raise
async def _relay_fanout(self, payload: dict) -> bytes:
"""Fire the same relay against N distinct script IDs in parallel.
Returns the first successful response; cancels the rest as soon as
one finishes. Any script that raises or loses the race AND later
fails individually is blacklisted for SCRIPT_BLACKLIST_TTL.
"""
host_key = self._host_key(payload.get("u"))
sids = self._pick_fanout_sids(host_key)
if len(sids) <= 1:
# Nothing to race against (e.g. all others blacklisted)
return await self._relay_single_h2_with_sid(payload, sids[0])
tasks = {
asyncio.create_task(
self._relay_single_h2_with_sid(payload, sid)
): sid
for sid in sids
}
winner_result: bytes | None = None
winner_exc: BaseException | None = None
pending = set(tasks.keys())
try:
while pending:
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED,
)
for t in done:
sid = tasks[t]
exc = t.exception()
if exc is None:
winner_result = t.result()
return winner_result
# This racer failed — blacklist and keep waiting for others
self._blacklist_sid(sid, reason=type(exc).__name__)
winner_exc = exc
# All racers failed
if winner_exc is not None:
raise winner_exc
raise RuntimeError("fan-out relay: all racers failed")
finally:
for t in pending:
t.cancel()
# Drain cancelled tasks so they don't log warnings
if pending:
await asyncio.gather(*pending, return_exceptions=True)
async def _relay_single_h2(self, payload: dict) -> bytes:
"""Execute a relay through HTTP/2 multiplexing.
@@ -842,6 +1153,27 @@ class DomainFronter:
return self._parse_relay_response(body)
async def _relay_single_h2_with_sid(self, payload: dict,
sid: str) -> bytes:
"""Execute an H2 relay pinned to a specific Apps Script deployment.
Used by `_relay_fanout` to race multiple script IDs in parallel.
Mirrors `_relay_single_h2` but ignores the stable-hash routing.
"""
full_payload = dict(payload)
full_payload["k"] = self.auth_key
json_body = json.dumps(full_payload).encode()
path = self._exec_path_for_sid(sid)
status, headers, body = await self._h2.request(
method="POST", path=path, host=self.http_host,
headers={"content-type": "application/json"},
body=json_body,
)
return self._parse_relay_response(body)
async def _relay_single(self, payload: dict) -> bytes:
"""Execute a single relay POST → redirect → parse."""
# Add auth key
+14 -3
View File
@@ -62,10 +62,15 @@ class H2Transport:
"""
def __init__(self, connect_host: str, sni_host: str,
verify_ssl: bool = True):
verify_ssl: bool = True,
sni_hosts: list[str] | None = None):
self.connect_host = connect_host
self.sni_host = sni_host
self.verify_ssl = verify_ssl
# Optional SNI rotation pool — picked round-robin on each new connect.
# Falls back to the single sni_host if no pool is given.
self._sni_hosts: list[str] = [h for h in (sni_hosts or []) if h] or [sni_host]
self._sni_idx: int = 0
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
@@ -107,6 +112,12 @@ class H2Transport:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
# Pick next SNI from the rotation pool so repeated reconnects
# don't fingerprint as "always www.google.com".
sni = self._sni_hosts[self._sni_idx % len(self._sni_hosts)]
self._sni_idx += 1
self.sni_host = sni # kept for backward-compat logging
# Create raw TCP socket with TCP_NODELAY BEFORE TLS handshake.
# Nagle's algorithm can delay small writes (H2 frames) by up to 200ms
# waiting to coalesce — TCP_NODELAY forces immediate send.
@@ -124,7 +135,7 @@ class H2Transport:
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(
ssl=ctx,
server_hostname=self.sni_host,
server_hostname=sni,
sock=raw,
),
timeout=15,
@@ -165,7 +176,7 @@ class H2Transport:
self._connected = True
self._read_task = asyncio.create_task(self._reader_loop())
log.info("H2 connected → %s (SNI=%s, TCP_NODELAY=on)",
self.connect_host, self.sni_host)
self.connect_host, sni)
async def reconnect(self):
"""Close current connection and re-establish."""