diff --git a/README.md b/README.md index 414beda..3c762a2 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,14 @@ and generates a strong random password for you. Follow the Apps Script deploymen instructions in **Step 2** below before running the wizard so you have a Deployment ID ready. +## Project Structure + +- `src/core/` shared modules (config constants, logging, cert install, LAN, scanner) +- `src/proxy/` local proxy runtime (HTTP/SOCKS, MITM, proxy helpers) +- `src/relay/` Apps Script relay runtime (relay engine, parsing, H2, helpers) +- `apps_script/` deployable edge/runtime scripts +- `docs/exit-node/` exit-node deployment guides + After it's running, jump to **Step 5** (browser proxy) and **Step 6** (CA certificate). @@ -188,7 +196,7 @@ You can deploy any one of these free exit-node templates: 3. Deno Deploy: [`apps_script/deno_deploy.ts`](apps_script/deno_deploy.ts) Full step-by-step deployment guide (all providers): -- [EXIT_NODE_DEPLOYMENT.md](EXIT_NODE_DEPLOYMENT.md) +- [docs/exit-node/EXIT_NODE_DEPLOYMENT.md](docs/exit-node/EXIT_NODE_DEPLOYMENT.md) Set the same PSK secret inside the exit-node code (`PSK` constant) and in `config.json`. diff --git a/README_FA.md b/README_FA.md index 3382819..962725c 100644 --- a/README_FA.md +++ b/README_FA.md @@ -61,6 +61,16 @@ --- +## ساختار پروژه + +- `src/core/` ماژولهای مشترک (ثابتها، لاگ، نصب گواهی، LAN، اسکنر) +- `src/proxy/` هسته پراکسی محلی (HTTP/SOCKS، MITM، ابزارهای پراکسی) +- `src/relay/` هسته رله Apps Script (موتور رله، پارس پاسخ، H2، ابزارها) +- `apps_script/` اسکریپتهای deploy روی سرویسهای edge +- `docs/exit-node/` راهنماهای deployment نود خروجی + +--- + ## راهاندازی مرحلهبهمرحله ### مرحله 1: دریافت پروژه @@ -147,8 +157,8 @@ cp config.example.json config.json 3. Deno Deploy: [apps_script/deno_deploy.ts](apps_script/deno_deploy.ts) راهنمای کامل مرحلهبهمرحله برای هر provider: -- [EXIT_NODE_DEPLOYMENT_FA.md](EXIT_NODE_DEPLOYMENT_FA.md) (فارسی) -- [EXIT_NODE_DEPLOYMENT.md](EXIT_NODE_DEPLOYMENT.md) (انگلیسی) +- [docs/exit-node/EXIT_NODE_DEPLOYMENT_FA.md](docs/exit-node/EXIT_NODE_DEPLOYMENT_FA.md) (فارسی) +- [docs/exit-node/EXIT_NODE_DEPLOYMENT.md](docs/exit-node/EXIT_NODE_DEPLOYMENT.md) (انگلیسی) سپس همان secret را هم در کد نود خروجی (`PSK`) و هم در `config.json` یکسان بگذارید. diff --git a/EXIT_NODE_DEPLOYMENT.md b/docs/exit-node/EXIT_NODE_DEPLOYMENT.md similarity index 100% rename from EXIT_NODE_DEPLOYMENT.md rename to docs/exit-node/EXIT_NODE_DEPLOYMENT.md diff --git a/EXIT_NODE_DEPLOYMENT_FA.md b/docs/exit-node/EXIT_NODE_DEPLOYMENT_FA.md similarity index 100% rename from EXIT_NODE_DEPLOYMENT_FA.md rename to docs/exit-node/EXIT_NODE_DEPLOYMENT_FA.md diff --git a/main.py b/main.py index 2468bc0..4344ff1 100644 --- a/main.py +++ b/main.py @@ -14,23 +14,19 @@ import logging import os import sys -# Project modules live under ./src — put that folder on sys.path so the -# historical flat imports ("from proxy_server import …") keep working. +# Project modules live under ./src — add it to sys.path so package imports +# like "from proxy.proxy_server import ProxyServer" work from project root. _SRC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "src") if _SRC_DIR not in sys.path: sys.path.insert(0, _SRC_DIR) -from cert_installer import install_ca, uninstall_ca, is_ca_trusted -from constants import __version__ -from lan_utils import log_lan_access -from google_ip_scanner import scan_sync -from logging_utils import configure as configure_logging, print_banner -from mitm import CA_CERT_FILE -from proxy_server import ProxyServer - - -def setup_logging(level_name: str): - configure_logging(level_name) +from core.cert_installer import install_ca, uninstall_ca, is_ca_trusted +from core.constants import __version__ +from core.lan_utils import log_lan_access +from core.google_ip_scanner import scan_sync +from core.logging_utils import configure as configure_logging, print_banner +from proxy.mitm import CA_CERT_FILE +from proxy.proxy_server import ProxyServer _PLACEHOLDER_AUTH_KEYS = { @@ -111,13 +107,13 @@ def main(): # Handle cert-only commands before loading config so they can run standalone. if args.install_cert or args.uninstall_cert: - setup_logging("INFO") + configure_logging("INFO") _log = logging.getLogger("Main") if args.install_cert: _log.info("Installing CA certificate…") if not os.path.exists(CA_CERT_FILE): - from mitm import MITMCertManager + from proxy.mitm import MITMCertManager MITMCertManager() # side-effect: creates ca/ca.crt + ca/ca.key ok = install_ca(CA_CERT_FILE) sys.exit(0 if ok else 1) @@ -219,14 +215,14 @@ def main(): # ── Google IP Scanner ────────────────────────────────────────────────── if args.scan: - setup_logging("INFO") + configure_logging("INFO") front_domain = config.get("front_domain", "www.google.com") _log = logging.getLogger("Main") _log.info(f"Scanning Google IPs (fronting domain: {front_domain})") ok = scan_sync(front_domain) sys.exit(0 if ok else 1) - setup_logging(config.get("log_level", "INFO")) + configure_logging(config.get("log_level", "INFO")) log = logging.getLogger("Main") print_banner(__version__) @@ -245,7 +241,7 @@ def main(): # Ensure CA file exists before checking / installing it. # MITMCertManager generates ca/ca.crt on first instantiation. if not os.path.exists(CA_CERT_FILE): - from mitm import MITMCertManager + from proxy.mitm import MITMCertManager MITMCertManager() # side-effect: creates ca/ca.crt + ca/ca.key # Auto-install MITM CA if not already trusted diff --git a/scripts/benchmark_transport.py b/scripts/benchmark_transport.py new file mode 100644 index 0000000..9c876c0 --- /dev/null +++ b/scripts/benchmark_transport.py @@ -0,0 +1,694 @@ +""" +Transport protocol & connection benchmark suite. + +Tests run against Google's edge IP with SNI fronting. Four suites: + + 1. Protocol sequential — H1.1 / H2 / H3, one request at a time (apples-to-apples latency) + 2. TLS session resumption — cold connect vs warm reconnect using cached session ticket + 3. Concurrency — H2 multiplex (N streams on 1 conn) vs H1.1 parallel (N separate conns) + 4. IP scan — probe all candidate Google IPs to find the fastest one on this network + +Usage: + python scripts/benchmark_transport.py # reads config.json + python scripts/benchmark_transport.py --ip 216.239.38.120 --sni www.google.com + python scripts/benchmark_transport.py --suite protocol # only run suite 1 + python scripts/benchmark_transport.py --suite resumption + python scripts/benchmark_transport.py --suite concurrency + python scripts/benchmark_transport.py --suite ipscan +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import os +import socket +import ssl +import statistics +import sys +import time +from pathlib import Path + +# ── Optional imports ────────────────────────────────────────────────────── + +try: + import h2.connection + import h2.config + import h2.events + import h2.settings + H2_AVAILABLE = True +except ImportError: + H2_AVAILABLE = False + +try: + import certifi + _CAFILE = certifi.where() +except ImportError: + _CAFILE = None + +try: + import aioquic.asyncio as quic_asyncio + import aioquic.h3.connection as h3c + import aioquic.h3.events as h3e + import aioquic.quic.configuration as quic_cfg + import aioquic.quic.events as quic_events + H3_AVAILABLE = True +except ImportError: + H3_AVAILABLE = False + + +# ── TLS context helpers ─────────────────────────────────────────────────── + +def _make_tls_ctx(alpn: list[str]) -> ssl.SSLContext: + ctx = ssl.create_default_context() + if _CAFILE: + try: + ctx.load_verify_locations(cafile=_CAFILE) + except Exception: + pass + ctx.set_alpn_protocols(alpn) + return ctx + + +# ── HTTP/1.1 probe ──────────────────────────────────────────────────────── + +async def _probe_h1(host_ip: str, sni: str, path: str, timeout: float) -> float: + """Return elapsed seconds for one H1.1 GET. Raises on error.""" + ctx = _make_tls_ctx(["http/1.1"]) + raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + raw.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + raw.setblocking(False) + + t0 = time.perf_counter() + loop = asyncio.get_running_loop() + await asyncio.wait_for(loop.sock_connect(raw, (host_ip, 443)), timeout=timeout) + reader, writer = await asyncio.wait_for( + asyncio.open_connection(ssl=ctx, server_hostname=sni, sock=raw), + timeout=timeout, + ) + + req = ( + f"GET {path} HTTP/1.1\r\n" + f"Host: {sni}\r\n" + "Accept: */*\r\n" + "Connection: close\r\n" + "\r\n" + ).encode() + writer.write(req) + await asyncio.wait_for(writer.drain(), timeout=timeout) + + resp = b"" + while True: + chunk = await asyncio.wait_for(reader.read(4096), timeout=timeout) + if not chunk: + break + resp += chunk + if b"\r\n\r\n" in resp: + break + writer.close() + elapsed = time.perf_counter() - t0 + if not resp.startswith(b"HTTP/"): + raise RuntimeError(f"Unexpected response: {resp[:60]!r}") + return elapsed + + +# ── HTTP/2 probe ────────────────────────────────────────────────────────── + +async def _probe_h2_fresh(host_ip: str, sni: str, path: str, timeout: float) -> float: + """One H2 GET on a NEW connection each time (apples-to-apples vs H1).""" + if not H2_AVAILABLE: + raise RuntimeError("h2 not installed") + + ctx = _make_tls_ctx(["h2", "http/1.1"]) + raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + raw.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + raw.setblocking(False) + + t0 = time.perf_counter() + loop = asyncio.get_running_loop() + await asyncio.wait_for(loop.sock_connect(raw, (host_ip, 443)), timeout=timeout) + reader, writer = await asyncio.wait_for( + asyncio.open_connection(ssl=ctx, server_hostname=sni, sock=raw), + timeout=timeout, + ) + + ssl_obj = writer.get_extra_info("ssl_object") + negotiated = ssl_obj.selected_alpn_protocol() if ssl_obj else None + if negotiated != "h2": + writer.close() + raise RuntimeError(f"H2 ALPN failed (got {negotiated!r})") + + cfg = h2.config.H2Configuration(client_side=True, header_encoding="utf-8") + conn = h2.connection.H2Connection(cfg) + conn.initiate_connection() + writer.write(conn.data_to_send(65535)) + await writer.drain() + + stream_id = conn.get_next_available_stream_id() + conn.send_headers(stream_id, [ + (":method", "GET"), + (":path", path), + (":scheme", "https"), + (":authority", sni), + ("accept", "*/*"), + ], end_stream=True) + writer.write(conn.data_to_send(65535)) + await asyncio.wait_for(writer.drain(), timeout=timeout) + + headers_done = False + while not headers_done: + raw_data = await asyncio.wait_for(reader.read(65535), timeout=timeout) + if not raw_data: + break + events = conn.receive_data(raw_data) + writer.write(conn.data_to_send(65535)) + await writer.drain() + for ev in events: + if isinstance(ev, (h2.events.ResponseReceived, h2.events.StreamEnded, + h2.events.DataReceived)): + if isinstance(ev, h2.events.ResponseReceived) and ev.stream_id == stream_id: + headers_done = True + + writer.close() + return time.perf_counter() - t0 + + +# ── HTTP/3 (QUIC) probe ─────────────────────────────────────────────────── + +class _H3ProbeProtocol(quic_asyncio.QuicConnectionProtocol): + """Minimal aioquic protocol that sends one H3 GET and captures the result.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._h3: h3c.H3Connection | None = None + self._done: asyncio.Future[float] = asyncio.get_event_loop().create_future() + self._t0: float = time.perf_counter() + self._stream_id: int | None = None + + def quic_event_received(self, event): + if isinstance(event, quic_events.HandshakeCompleted): + self._h3 = h3c.H3Connection(self._quic, enable_webtransport=False) + if self._h3 is None: + return + for h3ev in self._h3.handle_event(event): + if isinstance(h3ev, h3e.HeadersReceived): + if not self._done.done(): + self._done.set_result(time.perf_counter() - self._t0) + elif isinstance(h3ev, h3e.DataReceived): + pass # don't need body + + def send_request(self, sni: str, path: str): + self._stream_id = self._quic.get_next_available_stream_id() + self._h3.send_headers( + stream_id=self._stream_id, + headers=[ + (b":method", b"GET"), + (b":path", path.encode()), + (b":scheme", b"https"), + (b":authority", sni.encode()), + (b"accept", b"*/*"), + ], + end_stream=True, + ) + self.transmit() + + +async def _h3_inner(host_ip: str, sni: str, path: str, timeout: float) -> float: + cfg = quic_cfg.QuicConfiguration( + is_client=True, + server_name=sni, + alpn_protocols=h3c.H3_ALPN, + verify_mode=ssl.CERT_REQUIRED, + ) + if _CAFILE: + try: + cfg.load_verify_locations(_CAFILE) + except Exception: + pass + + t0 = time.perf_counter() + async with quic_asyncio.connect( + host_ip, + 443, + configuration=cfg, + create_protocol=_H3ProbeProtocol, + ) as proto: + proto._t0 = t0 + proto.send_request(sni, path) + return await proto._done + + +async def _probe_h3(host_ip: str, sni: str, path: str, timeout: float) -> float: + if not H3_AVAILABLE: + raise RuntimeError("aioquic not installed") + + # QUIC uses UDP. Wrap the ENTIRE connect+request in wait_for so a + # network that silently drops UDP packets doesn't stall indefinitely. + h3_timeout = min(timeout, 5.0) + try: + return await asyncio.wait_for(_h3_inner(host_ip, sni, path, h3_timeout), timeout=h3_timeout) + except asyncio.TimeoutError: + raise TimeoutError(f"QUIC/UDP timed out after {h3_timeout:.1f}s — UDP likely blocked or no H3 support") + except Exception as exc: + raise RuntimeError(f"{type(exc).__name__}: {exc or 'no detail'}") + + +# ── Runner ──────────────────────────────────────────────────────────────── + +async def _run_protocol( + name: str, + probe, + host_ip: str, + sni: str, + path: str, + n: int, + timeout: float, +) -> dict: + times: list[float] = [] + errors = 0 + for i in range(n): + try: + t = await probe(host_ip, sni, path, timeout) + times.append(t) + except Exception as exc: + errors += 1 + desc = str(exc) or type(exc).__name__ + print(f" [{name}] request {i+1}/{n} FAILED: {desc}") + # If the first 3 all failed, give up early to avoid wasting time. + if errors >= 3 and not times: + print(f" [{name}] 3 consecutive failures with no success — aborting protocol test") + break + await asyncio.sleep(0.05) # tiny gap between probes + + return {"name": name, "times": times, "errors": errors, "n": n} + + +def _print_result(r: dict): + name = r["name"] + times = r["times"] + errors = r["errors"] + n = r["n"] + ok = len(times) + + if not times: + print(f" {name:10s} NO SUCCESSFUL REQUESTS (errors={errors}/{n})") + return + + mn = min(times) * 1000 + mx = max(times) * 1000 + avg = statistics.mean(times) * 1000 + med = statistics.median(times) * 1000 + p95 = sorted(times)[int(len(times) * 0.95)] * 1000 + + print( + f" {name:10s} " + f"ok={ok}/{n} " + f"min={mn:6.1f}ms " + f"avg={avg:6.1f}ms " + f"med={med:6.1f}ms " + f"p95={p95:6.1f}ms " + f"max={mx:6.1f}ms " + f"errors={errors}" + ) + + +async def main(host_ip: str, sni: str, path: str, n: int, timeout: float, + suite: str = "all"): + print(f"\nBenchmark target → {host_ip}:443 SNI={sni} path={path}") + print("=" * 80) + + run_all = suite == "all" + + # ── Suite 1: Protocol sequential ────────────────────────────────────── + if run_all or suite == "protocol": + print("\n── Suite 1: Protocol sequential latency ──────────────────────────────") + print(f" {n} sequential requests per protocol\n") + + protocols: list[tuple[str, object]] = [("HTTP/1.1", _probe_h1)] + if H2_AVAILABLE: + protocols.append(("HTTP/2", _probe_h2_fresh)) + else: + print(" [HTTP/2] skipped — pip install h2") + if H3_AVAILABLE: + protocols.append(("HTTP/3", _probe_h3)) + else: + print(" [HTTP/3] skipped — pip install aioquic") + + results = [] + for name, probe in protocols: + print(f" Running {name}...") + r = await _run_protocol(name, probe, host_ip, sni, path, n, timeout) + results.append(r) + + print() + for r in results: + _print_result(r) + + valid = [r for r in results if r["times"]] + if len(valid) > 1: + best = min(valid, key=lambda r: statistics.median(r["times"])) + print(f"\n Best median: {best['name']}") + h1r = next((r for r in valid if r["name"] == "HTTP/1.1"), None) + h2r = next((r for r in valid if r["name"] == "HTTP/2"), None) + h3r = next((r for r in valid if r["name"] == "HTTP/3"), None) + if h2r and h1r: + g = (statistics.median(h1r["times"]) - statistics.median(h2r["times"])) \ + / statistics.median(h1r["times"]) * 100 + print(f" H2 vs H1.1: {g:+.1f}%") + if h3r and h2r: + g = (statistics.median(h2r["times"]) - statistics.median(h3r["times"])) \ + / statistics.median(h2r["times"]) * 100 + print(f" H3 vs H2: {g:+.1f}%") + + # ── Suite 2: TLS session resumption ─────────────────────────────────── + if run_all or suite == "resumption": + print("\n── Suite 2: TLS session resumption ───────────────────────────────────") + print(" Measures cost of cold TLS handshake vs warm reconnect with session ticket\n") + await _suite_resumption(host_ip, sni, path, timeout, rounds=8) + + # ── Suite 3: Concurrency ────────────────────────────────────────────── + if run_all or suite == "concurrency": + print("\n── Suite 3: Concurrency — H2 multiplex vs H1.1 parallel ─────────────") + print(f" {n} concurrent requests fired simultaneously\n") + await _suite_concurrency(host_ip, sni, path, timeout, n=n) + + # ── Suite 4: IP scan ────────────────────────────────────────────────── + if run_all or suite == "ipscan": + print("\n── Suite 4: Google edge IP latency scan ──────────────────────────────") + print(" H1.1 probe to all candidate IPs — find the fastest one on this network\n") + await _suite_ipscan(sni, path, timeout) + + print("\n" + "=" * 80) + print("Done.") + + +# ── Suite 2: TLS session resumption ────────────────────────────────────── + +async def _tls_connect_time(host_ip: str, sni: str, timeout: float, + ctx: ssl.SSLContext | None = None) -> tuple[float, ssl.SSLContext]: + """Connect with TLS and return (elapsed, ctx). ctx is reused for warm tests.""" + if ctx is None: + ctx = _make_tls_ctx(["h2", "http/1.1"]) + + raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + raw.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + raw.setblocking(False) + + loop = asyncio.get_running_loop() + t0 = time.perf_counter() + await asyncio.wait_for(loop.sock_connect(raw, (host_ip, 443)), timeout=timeout) + reader, writer = await asyncio.wait_for( + asyncio.open_connection(ssl=ctx, server_hostname=sni, sock=raw), + timeout=timeout, + ) + elapsed = time.perf_counter() - t0 + # Send minimal request so the server doesn't RST the idle connection + writer.write(f"GET /generate_204 HTTP/1.1\r\nHost: {sni}\r\nConnection: close\r\n\r\n".encode()) + await asyncio.wait_for(writer.drain(), timeout=timeout) + try: + await asyncio.wait_for(reader.read(256), timeout=timeout) + except Exception: + pass + writer.close() + return elapsed, ctx + + +async def _suite_resumption(host_ip: str, sni: str, path: str, + timeout: float, rounds: int): + cold_times: list[float] = [] + warm_times: list[float] = [] + + # cold: fresh SSLContext each time — no session ticket reuse + print(" Cold connects (new TLS context each time)...") + for _ in range(rounds): + try: + t, _ = await _tls_connect_time(host_ip, sni, timeout, ctx=None) + cold_times.append(t * 1000) + except Exception as exc: + print(f" FAILED: {exc}") + await asyncio.sleep(0.1) + + # warm: reuse same SSLContext — OpenSSL caches and reuses TLS 1.3 session ticket + print(" Warm reconnects (same TLS context, session ticket reuse)...") + warm_ctx = _make_tls_ctx(["h2", "http/1.1"]) + for _ in range(rounds): + try: + t, warm_ctx = await _tls_connect_time(host_ip, sni, timeout, ctx=warm_ctx) + warm_times.append(t * 1000) + except Exception as exc: + print(f" FAILED: {exc}") + await asyncio.sleep(0.1) + + def _fmt(times: list[float]) -> str: + if not times: + return "no data" + return (f"min={min(times):.1f}ms avg={statistics.mean(times):.1f}ms " + f"med={statistics.median(times):.1f}ms max={max(times):.1f}ms") + + print(f"\n Cold ({len(cold_times)}/{rounds} ok): {_fmt(cold_times)}") + print(f" Warm ({len(warm_times)}/{rounds} ok): {_fmt(warm_times)}") + + if cold_times and warm_times: + saving = statistics.median(cold_times) - statistics.median(warm_times) + pct = saving / statistics.median(cold_times) * 100 + if saving > 5: + print(f"\n Session ticket saves ~{saving:.1f}ms ({pct:.1f}%) per reconnect") + print(" → The H2 transport already reuses one long-lived connection, so this") + print(" saving only applies when the connection drops and must reconnect.") + else: + print(f"\n Resumption saving: {saving:.1f}ms ({pct:.1f}%) — negligible on this network") + print(" → Google may be issuing short-lived tickets, or RTT already dominates.") + + +# ── Suite 3: Concurrency ────────────────────────────────────────────────── + +async def _h2_concurrent(host_ip: str, sni: str, path: str, + timeout: float, n: int) -> tuple[float, int]: + """ + Fire N H2 streams concurrently on ONE persistent connection. + Returns (wall_time_for_all, successful_count). + """ + if not H2_AVAILABLE: + raise RuntimeError("h2 not installed") + + ctx = _make_tls_ctx(["h2", "http/1.1"]) + raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + raw.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + raw.setblocking(False) + loop = asyncio.get_running_loop() + await asyncio.wait_for(loop.sock_connect(raw, (host_ip, 443)), timeout=timeout) + reader, writer = await asyncio.wait_for( + asyncio.open_connection(ssl=ctx, server_hostname=sni, sock=raw), + timeout=timeout, + ) + ssl_obj = writer.get_extra_info("ssl_object") + if not ssl_obj or ssl_obj.selected_alpn_protocol() != "h2": + writer.close() + raise RuntimeError("H2 ALPN not negotiated") + + cfg = h2.config.H2Configuration(client_side=True, header_encoding="utf-8") + conn = h2.connection.H2Connection(cfg) + conn.initiate_connection() + conn.increment_flow_control_window(2 ** 24 - 65535) + conn.update_settings({ + h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 8 * 1024 * 1024, + h2.settings.SettingCodes.ENABLE_PUSH: 0, + }) + writer.write(conn.data_to_send(65535)) + await writer.drain() + + # Track per-stream completion + stream_done: dict[int, asyncio.Event] = {} + stream_ids = [] + for _ in range(n): + sid = conn.get_next_available_stream_id() + conn.send_headers(sid, [ + (":method", "GET"), (":path", path), + (":scheme", "https"), (":authority", sni), ("accept", "*/*"), + ], end_stream=True) + stream_ids.append(sid) + stream_done[sid] = asyncio.Event() + + writer.write(conn.data_to_send(65535)) + await writer.drain() + + t0 = time.perf_counter() + done_count = 0 + deadline = t0 + timeout + + while done_count < n and time.perf_counter() < deadline: + try: + raw_data = await asyncio.wait_for( + reader.read(65535), + timeout=max(0.1, deadline - time.perf_counter()), + ) + except asyncio.TimeoutError: + break + if not raw_data: + break + events = conn.receive_data(raw_data) + writer.write(conn.data_to_send(65535)) + await writer.drain() + for ev in events: + if isinstance(ev, (h2.events.ResponseReceived, h2.events.StreamEnded)): + sid = ev.stream_id + if sid in stream_done and not stream_done[sid].is_set(): + if isinstance(ev, h2.events.ResponseReceived): + stream_done[sid].set() + done_count += 1 + elif isinstance(ev, h2.events.DataReceived): + conn.acknowledge_received_data(ev.flow_controlled_length, ev.stream_id) + writer.write(conn.data_to_send(65535)) + await writer.drain() + + wall = time.perf_counter() - t0 + writer.close() + return wall, done_count + + +async def _h1_parallel(host_ip: str, sni: str, path: str, + timeout: float, n: int) -> tuple[float, int]: + """Fire N H1.1 requests in parallel, each on its own TCP+TLS connection.""" + t0 = time.perf_counter() + tasks = [asyncio.create_task(_probe_h1(host_ip, sni, path, timeout)) for _ in range(n)] + results = await asyncio.gather(*tasks, return_exceptions=True) + wall = time.perf_counter() - t0 + ok = sum(1 for r in results if isinstance(r, float)) + return wall, ok + + +async def _suite_concurrency(host_ip: str, sni: str, path: str, + timeout: float, n: int): + concur_levels = sorted({4, 8, min(16, n), min(n, 20)}) + + print(f" {'Level':>5} {'H2 mux wall':>14} {'H1.1 parallel wall':>18} {'speedup':>8}") + print(f" {'-----':>5} {'----------':>14} {'----------------':>18} {'-------':>8}") + + for level in concur_levels: + h2_wall = h2_ok = h1_wall = h1_ok = None + h2_err = h1_err = None + + if H2_AVAILABLE: + try: + h2_wall, h2_ok = await _h2_concurrent(host_ip, sni, path, timeout, level) + except Exception as exc: + h2_err = str(exc) or type(exc).__name__ + + try: + h1_wall, h1_ok = await _h1_parallel(host_ip, sni, path, timeout, level) + except Exception as exc: + h1_err = str(exc) or type(exc).__name__ + + h2_str = f"{h2_wall*1000:6.0f}ms ({h2_ok}/{level})" if h2_wall is not None else f"FAIL: {h2_err}" + h1_str = f"{h1_wall*1000:6.0f}ms ({h1_ok}/{level})" if h1_wall is not None else f"FAIL: {h1_err}" + + if h2_wall and h1_wall and h1_wall > 0: + speedup = f"{h1_wall / h2_wall:+.2f}x" + else: + speedup = "n/a" + + print(f" {level:>5} {h2_str:>14} {h1_str:>18} {speedup:>8}") + await asyncio.sleep(0.2) + + print() + print(" Interpretation:") + print(" - H2 mux fires all streams on ONE TLS connection — lower overhead at scale") + print(" - H1.1 parallel opens N separate connections — higher per-connection TLS cost") + print(" - Speedup > 1.0x means H2 mux completed all requests in less wall time") + + +# ── Suite 4: IP scan ────────────────────────────────────────────────────── + +_CANDIDATE_IPS = ( + "216.239.32.120", "216.239.34.120", "216.239.36.120", "216.239.38.120", + "142.250.80.142", "142.250.80.138", "142.250.179.110", "142.250.185.110", + "142.250.184.206", "142.250.190.238", "142.250.191.78", + "172.217.1.206", "172.217.14.206", "172.217.16.142", "172.217.22.174", + "172.217.164.110","172.217.168.206","172.217.169.206", + "34.107.221.82", + "142.251.32.110", "142.251.33.110", "142.251.46.206", "142.251.46.238", + "142.250.80.170", "142.250.72.206", "142.250.64.206", "142.250.72.110", +) + + +async def _probe_ip(ip: str, sni: str, path: str, timeout: float) -> tuple[str, float | None, str]: + """Return (ip, median_ms_or_None, note).""" + times = [] + for _ in range(3): + try: + t = await _probe_h1(ip, sni, path, timeout) + times.append(t * 1000) + except Exception: + pass + await asyncio.sleep(0.03) + if not times: + return ip, None, "unreachable" + med = statistics.median(times) + return ip, med, "" + + +async def _suite_ipscan(sni: str, path: str, timeout: float): + ip_timeout = min(timeout, 5.0) + print(f" Probing {len(_CANDIDATE_IPS)} candidate IPs (3 requests each, {ip_timeout:.0f}s cap)...\n") + + # Run all probes concurrently — they're independent H1.1 connects + tasks = [asyncio.create_task(_probe_ip(ip, sni, path, ip_timeout)) + for ip in _CANDIDATE_IPS] + raw_results = await asyncio.gather(*tasks) + + reachable = [(ip, med, note) for ip, med, note in raw_results if med is not None] + dead = [(ip, med, note) for ip, med, note in raw_results if med is None] + + reachable.sort(key=lambda x: x[1]) + + print(f" {'IP':>18} {'median':>9} note") + print(f" {'--':>18} {'------':>9} ----") + for i, (ip, med, _) in enumerate(reachable): + tag = " ← fastest" if i == 0 else (" ← 2nd" if i == 1 else "") + print(f" {ip:>18} {med:7.1f}ms{tag}") + + if dead: + print(f"\n Unreachable ({len(dead)}): {', '.join(ip for ip, *_ in dead)}") + + if reachable: + best_ip, best_med, _ = reachable[0] + print(f"\n Fastest IP: {best_ip} (median {best_med:.1f}ms)") + print(f' Set in config.json: "google_ip": "{best_ip}"') + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Transport benchmark suite") + parser.add_argument("--ip", help="Google edge IP (default: from config.json)") + parser.add_argument("--sni", default="www.google.com", help="SNI hostname") + parser.add_argument("--path", default="/generate_204", help="Request path") + parser.add_argument("--n", type=int, default=15, help="Requests per protocol") + parser.add_argument("--timeout", type=float, default=10.0, help="Per-request timeout (s)") + parser.add_argument( + "--suite", + choices=["all", "protocol", "resumption", "concurrency", "ipscan"], + default="all", + help="Which benchmark suite to run (default: all)", + ) + args = parser.parse_args() + + host_ip = args.ip + if not host_ip: + cfg_path = Path(__file__).parent.parent / "config.json" + if cfg_path.exists(): + with open(cfg_path) as f: + data = json.load(f) + host_ip = data.get("google_ip", "216.239.38.120") + print(f"Using google_ip from config.json: {host_ip}") + else: + host_ip = "216.239.38.120" + print(f"config.json not found, using default: {host_ip}") + + asyncio.run(main( + host_ip=host_ip, + sni=args.sni, + path=args.path, + n=args.n, + timeout=args.timeout, + suite=args.suite, + )) diff --git a/scripts/build_release_bundle.py b/scripts/build_release_bundle.py deleted file mode 100644 index ce57a7e..0000000 --- a/scripts/build_release_bundle.py +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env python3 -from __future__ import annotations - -import hashlib -import os -import re -import shutil -import tarfile -import zipfile -from pathlib import Path - - -def _read_version(root: Path) -> str: - constants_py = (root / "src" / "constants.py").read_text(encoding="utf-8") - m = re.search(r'__version__\s*=\s*"([^"]+)"', constants_py) - return m.group(1) if m else "0.0.0" - - -def main() -> int: - root = Path(".").resolve() - target = os.environ.get("TARGET", "") - if not target: - raise SystemExit("TARGET environment variable is required") - - version = _read_version(root) - binary_name = "MasterHttpRelayVPN.exe" if os.name == "nt" else "MasterHttpRelayVPN" - binary_path = root / "dist" / binary_name - if not binary_path.exists(): - raise SystemExit(f"binary not found: {binary_path}") - - bundle_name = f"MasterHttpRelayVPN-{version}-{target}" - bundle_root = root / "package" / bundle_name - if bundle_root.exists(): - shutil.rmtree(bundle_root) - bundle_root.mkdir(parents=True, exist_ok=True) - config_example = root / "config.example.json" - if not config_example.exists(): - raise SystemExit(f"missing config.example.json: {config_example}") - shutil.copy2(config_example, bundle_root / "config.example.json") - shutil.copy2(binary_path, bundle_root / binary_name) - - if os.name != "nt": - (bundle_root / binary_name).chmod(0o755) - - release_dir = root / "release-assets" - release_dir.mkdir(parents=True, exist_ok=True) - - if target.startswith("windows"): - archive = release_dir / f"{bundle_name}.zip" - with zipfile.ZipFile(archive, "w", compression=zipfile.ZIP_DEFLATED) as zf: - for path in bundle_root.rglob("*"): - zf.write(path, path.relative_to(bundle_root.parent)) - else: - archive = release_dir / f"{bundle_name}.tar.gz" - with tarfile.open(archive, "w:gz") as tf: - tf.add(bundle_root, arcname=bundle_name) - - digest = hashlib.sha256(archive.read_bytes()).hexdigest() - (release_dir / f"{archive.name}.sha256").write_text( - f"{digest} {archive.name}\n", - encoding="utf-8", - ) - - print(f"Created {archive}") - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..0b577df --- /dev/null +++ b/src/core/__init__.py @@ -0,0 +1,6 @@ +from .constants import * +from .codec import * +from .logging_utils import * +from .cert_installer import * +from .lan_utils import * +from .google_ip_scanner import * diff --git a/src/cert_installer.py b/src/core/cert_installer.py similarity index 99% rename from src/cert_installer.py rename to src/core/cert_installer.py index 704a4a0..1bfba87 100644 --- a/src/cert_installer.py +++ b/src/core/cert_installer.py @@ -5,7 +5,7 @@ Supports: Windows, macOS, Linux (Debian/Ubuntu, RHEL/Fedora/CentOS, Arch). Also attempts to install into Firefox's NSS certificate store when found. Usage: - from cert_installer import install_ca, is_ca_trusted + from core.cert_installer import install_ca, is_ca_trusted install_ca("/path/to/ca.crt", cert_name="MasterHttpRelayVPN") """ diff --git a/src/codec.py b/src/core/codec.py similarity index 97% rename from src/codec.py rename to src/core/codec.py index f8d617a..f31dc69 100644 --- a/src/codec.py +++ b/src/core/codec.py @@ -30,6 +30,9 @@ except ImportError: # pragma: no cover _ZSTD_DCTX = None +__all__ = ["supported_encodings", "has_brotli", "has_zstd", "decode"] + + def supported_encodings() -> str: """Value for Accept-Encoding that this relay can actually decode.""" codecs = ["gzip", "deflate"] diff --git a/src/constants.py b/src/core/constants.py similarity index 100% rename from src/constants.py rename to src/core/constants.py diff --git a/src/google_ip_scanner.py b/src/core/google_ip_scanner.py similarity index 98% rename from src/google_ip_scanner.py rename to src/core/google_ip_scanner.py index 79edc51..9a17c5a 100644 --- a/src/google_ip_scanner.py +++ b/src/core/google_ip_scanner.py @@ -15,7 +15,7 @@ import time from dataclasses import dataclass from typing import Optional -from constants import CANDIDATE_IPS, GOOGLE_SCANNER_TIMEOUT, GOOGLE_SCANNER_CONCURRENCY +from .constants import CANDIDATE_IPS, GOOGLE_SCANNER_TIMEOUT, GOOGLE_SCANNER_CONCURRENCY log = logging.getLogger("Scanner") diff --git a/src/lan_utils.py b/src/core/lan_utils.py similarity index 100% rename from src/lan_utils.py rename to src/core/lan_utils.py diff --git a/src/logging_utils.py b/src/core/logging_utils.py similarity index 83% rename from src/logging_utils.py rename to src/core/logging_utils.py index 55162d7..3c49952 100644 --- a/src/logging_utils.py +++ b/src/core/logging_utils.py @@ -227,44 +227,43 @@ def _install_asyncio_noise_filter() -> None: def print_banner(version: str, *, stream=None) -> None: - """Print a polished startup banner with color fallbacks.""" + """Print an ASCII startup banner with color fallbacks.""" stream = stream or sys.stderr color = _supports_color(stream) def c(code: str) -> str: return code if color else "" - title = "MasterHttpRelayVPN" - subtitle = "Domain-Fronted Apps Script Relay" - version_tag = f"v{version}" - - left = f" {title} " - center = f" {subtitle} " - right = f" {version_tag} " - inner_width = max(68, len(left) + len(center) + len(right) + 2) - - gap = inner_width - (len(left) + len(center) + len(right)) - left_gap = gap // 2 - right_gap = gap - left_gap - - top = "╭" + ("─" * inner_width) + "╮" - mid = "│" + left + (" " * left_gap) + center + (" " * right_gap) + right + "│" - bot = "╰" + ("─" * inner_width) + "╯" + art = [ + " __ __ _ ____ _____ _____ ____ ", + "| \\/ | / \\ / ___|_ _| ____| _ \\ ", + "| |\\/| | / _ \\ \\___ \\ | | | _| | |_) |", + "| | | |/ ___ \\ ___) || | | |___| _ < ", + "|_| |_/_/ \\_\\____/ |_| |_____|_| \\_\\", + " _ _ _____ _____ ____ ____ _____ _ _ __ __", + " | | | |_ _|_ _| _ \\ | _ \\| ____| | / \\\\ \\ / /", + " | |_| | | | | | | |_) | | |_) | _| | | / _ \\\\ V / ", + " | _ | | | | | | __/ | _ <| |___| |___ / ___ \\| | ", + " |_| |_| |_| |_| |_| |_| \\_\\_____|_____/_/ \\_\\_| ", + ] + version_line = f"Version {version}" + link = "https://github.com/masterking32/MasterHttpRelayVPN" + width = max(max(len(line) for line in art), len(version_line), len(link)) + rule = "=" * width if color: - top = f"{DIM}{FG_GRAY}{top}{RESET}" - bot = f"{DIM}{FG_GRAY}{bot}{RESET}" - mid = ( - f"{DIM}{FG_GRAY}│{RESET}" - f"{BOLD}{FG_CYAN}{left}{RESET}" - f"{' ' * left_gap}" - f"{FG_GRAY}{center}{RESET}" - f"{' ' * right_gap}" - f"{BOLD}{FG_TEAL}{right}{RESET}" - f"{DIM}{FG_GRAY}│{RESET}" - ) + print(f"{DIM}{FG_GRAY}{rule}{RESET}", file=stream) + for line in art: + print(f"{BOLD}{FG_CYAN}{line.center(width)}{RESET}", file=stream) + print(f"{FG_GRAY}{version_line.center(width)}{RESET}", file=stream) + print(f"{FG_TEAL}{link.center(width)}{RESET}", file=stream) + print(f"{DIM}{FG_GRAY}{rule}{RESET}", file=stream) + else: + print(rule, file=stream) + for line in art: + print(line.center(width), file=stream) + print(version_line.center(width), file=stream) + print(link.center(width), file=stream) + print(rule, file=stream) - print(top, file=stream) - print(mid, file=stream) - print(bot, file=stream) stream.flush() diff --git a/src/proxy/__init__.py b/src/proxy/__init__.py new file mode 100644 index 0000000..aad5321 --- /dev/null +++ b/src/proxy/__init__.py @@ -0,0 +1,5 @@ +from .proxy_server import ProxyServer +from .proxy_support import * +from .socks5 import * +from .mitm import * +__all__ = ["ProxyServer"] diff --git a/src/mitm.py b/src/proxy/mitm.py similarity index 97% rename from src/mitm.py rename to src/proxy/mitm.py index 882581b..ec04194 100644 --- a/src/mitm.py +++ b/src/proxy/mitm.py @@ -24,10 +24,10 @@ from cryptography.x509.oid import NameOID log = logging.getLogger("MITM") -# CA lives at the project root (../ca/ relative to this file in src/). -# The installed trusted root was generated there; keep using it. +# Keep the CA at repository root so docs/installer paths stay stable. _THIS_DIR = os.path.dirname(os.path.abspath(__file__)) -_PROJECT_ROOT = os.path.dirname(_THIS_DIR) +_SRC_DIR = os.path.dirname(_THIS_DIR) +_PROJECT_ROOT = os.path.dirname(_SRC_DIR) CA_DIR = os.path.join(_PROJECT_ROOT, "ca") CA_KEY_FILE = os.path.join(CA_DIR, "ca.key") CA_CERT_FILE = os.path.join(CA_DIR, "ca.crt") diff --git a/src/proxy_server.py b/src/proxy/proxy_server.py similarity index 77% rename from src/proxy_server.py rename to src/proxy/proxy_server.py index ce00a04..5a09789 100644 --- a/src/proxy_server.py +++ b/src/proxy/proxy_server.py @@ -13,18 +13,14 @@ import socket import ssl import time import ipaddress -from urllib.parse import urlparse try: import certifi except Exception: # optional dependency fallback certifi = None -from constants import ( +from core.constants import ( CACHE_MAX_MB, - CACHE_TTL_MAX, - CACHE_TTL_STATIC_LONG, - CACHE_TTL_STATIC_MED, CLIENT_IDLE_TIMEOUT, GOOGLE_DIRECT_ALLOW_EXACT, GOOGLE_DIRECT_ALLOW_SUFFIXES, @@ -36,132 +32,29 @@ from constants import ( MAX_HEADER_BYTES, MAX_REQUEST_BODY_BYTES, SNI_REWRITE_SUFFIXES, - STATIC_EXTS, TCP_CONNECT_TIMEOUT, TRACE_HOST_SUFFIXES, UNCACHEABLE_HEADER_NAMES, ) -from domain_fronter import DomainFronter +from relay.domain_fronter import DomainFronter +from .socks5 import negotiate_socks5 +from .proxy_support import ( + ResponseCache, + cors_preflight_response, + has_unsupported_transfer_encoding, + header_value, + host_matches_rules, + inject_cors_headers, + is_ip_literal, + load_host_rules, + log_response_summary, + parse_content_length, +) +from relay.relay_response import split_raw_response log = logging.getLogger("Proxy") -def _is_ip_literal(host: str) -> bool: - """True for IPv4/IPv6 literals (strips brackets around IPv6).""" - h = host.strip("[]") - try: - ipaddress.ip_address(h) - return True - except ValueError: - return False - - -def _parse_content_length(header_block: bytes) -> int: - """Return Content-Length or 0. Matches only the exact header name.""" - for raw_line in header_block.split(b"\r\n"): - name, sep, value = raw_line.partition(b":") - if not sep: - continue - if name.strip().lower() == b"content-length": - try: - return int(value.strip()) - except ValueError: - return 0 - return 0 - - -def _has_unsupported_transfer_encoding(header_block: bytes) -> bool: - """True when the request uses Transfer-Encoding, which we don't stream.""" - for raw_line in header_block.split(b"\r\n"): - name, sep, value = raw_line.partition(b":") - if not sep: - continue - if name.strip().lower() != b"transfer-encoding": - continue - encodings = [ - token.strip().lower() - for token in value.decode(errors="replace").split(",") - if token.strip() - ] - return any(token != "identity" for token in encodings) - return False - - -class ResponseCache: - """Simple LRU response cache — avoids repeated relay calls.""" - - def __init__(self, max_mb: int = 50): - self._store: dict[str, tuple[bytes, float]] = {} - self._size = 0 - self._max = max_mb * 1024 * 1024 - self.hits = 0 - self.misses = 0 - - def get(self, url: str) -> bytes | None: - entry = self._store.get(url) - if not entry: - self.misses += 1 - return None - raw, expires = entry - if time.time() > expires: - self._size -= len(raw) - del self._store[url] - self.misses += 1 - return None - self.hits += 1 - return raw - - def put(self, url: str, raw_response: bytes, ttl: int = 300): - size = len(raw_response) - if size > self._max // 4 or size == 0: - return - # Evict oldest to make room - while self._size + size > self._max and self._store: - oldest = next(iter(self._store)) - self._size -= len(self._store[oldest][0]) - del self._store[oldest] - if url in self._store: - self._size -= len(self._store[url][0]) - self._store[url] = (raw_response, time.time() + ttl) - self._size += size - - @staticmethod - def parse_ttl(raw_response: bytes, url: str) -> int: - """Determine cache TTL from response headers and URL.""" - hdr_end = raw_response.find(b"\r\n\r\n") - if hdr_end < 0: - return 0 - hdr = raw_response[:hdr_end].decode(errors="replace").lower() - - # Don't cache errors or non-200 - if b"HTTP/1.1 200" not in raw_response[:20]: - return 0 - if "no-store" in hdr or "private" in hdr or "set-cookie:" in hdr: - return 0 - - # Explicit max-age - m = re.search(r"max-age=(\d+)", hdr) - if m: - return min(int(m.group(1)), CACHE_TTL_MAX) - - # Heuristic by content type / extension - path = url.split("?")[0].lower() - for ext in STATIC_EXTS: - if path.endswith(ext): - return CACHE_TTL_STATIC_LONG - - ct_m = re.search(r"content-type:\s*([^\r\n]+)", hdr) - ct = ct_m.group(1) if ct_m else "" - if "image/" in ct or "font/" in ct: - return CACHE_TTL_STATIC_LONG - if "text/css" in ct or "javascript" in ct: - return CACHE_TTL_STATIC_MED - if "text/html" in ct or "application/json" in ct: - return 0 # don't cache dynamic content by default - - return 0 - - class ProxyServer: # Pulled from constants.py so users can override any subset via config. _GOOGLE_DIRECT_EXACT_EXCLUDE = GOOGLE_DIRECT_EXACT_EXCLUDE @@ -246,8 +139,8 @@ class ProxyServer: # bypass_hosts — route directly (no MITM, no relay) # Both accept exact hostnames and leading-dot suffix patterns, # e.g. ".local" matches any *.local domain. - self._block_hosts = self._load_host_rules(config.get("block_hosts", [])) - self._bypass_hosts = self._load_host_rules(config.get("bypass_hosts", [])) + self._block_hosts = load_host_rules(config.get("block_hosts", [])) + self._bypass_hosts = load_host_rules(config.get("bypass_hosts", [])) # Route YouTube through the relay when requested; the Google frontend # IP can enforce SafeSearch on the SNI-rewrite path. @@ -261,7 +154,7 @@ class ProxyServer: self._SNI_REWRITE_SUFFIXES = SNI_REWRITE_SUFFIXES try: - from mitm import MITMCertManager + from .mitm import MITMCertManager self.mitm = MITMCertManager() except ImportError: log.error("Apps Script relay requires the 'cryptography' package.") @@ -319,135 +212,21 @@ class ProxyServer: if task is not None: self._client_tasks.discard(task) - @staticmethod - def _load_host_rules(raw) -> tuple[set[str], tuple[str, ...]]: - """Accept a list of host strings; return (exact_set, suffix_tuple). - - A rule starting with '.' (e.g. ".internal") is a suffix rule. - Everything else is treated as an exact match. Case-insensitive. - """ - exact: set[str] = set() - suffixes: list[str] = [] - for item in raw or []: - h = str(item).strip().lower().rstrip(".") - if not h: - continue - if h.startswith("."): - suffixes.append(h) - else: - exact.add(h) - return exact, tuple(suffixes) - - @staticmethod - def _host_matches_rules(host: str, - rules: tuple[set[str], tuple[str, ...]]) -> bool: - exact, suffixes = rules - h = host.lower().rstrip(".") - if h in exact: - return True - for s in suffixes: - if h.endswith(s): - return True - return False - def _is_blocked(self, host: str) -> bool: - return self._host_matches_rules(host, self._block_hosts) + return host_matches_rules(host, self._block_hosts) def _is_bypassed(self, host: str) -> bool: - return self._host_matches_rules(host, self._bypass_hosts) - - @staticmethod - def _header_value(headers: dict | None, name: str) -> str: - if not headers: - return "" - for key, value in headers.items(): - if key.lower() == name: - return str(value) - return "" + return host_matches_rules(host, self._bypass_hosts) def _cache_allowed(self, method: str, url: str, headers: dict | None, body: bytes) -> bool: if method.upper() != "GET" or body: return False for name in UNCACHEABLE_HEADER_NAMES: - if self._header_value(headers, name): + if header_value(headers, name): return False return self.fronter._is_static_asset_url(url) - @classmethod - def _should_trace_host(cls, host: str) -> bool: - h = host.lower().rstrip(".") - return any( - token == h or token in h or h.endswith("." + token) - for token in cls._TRACE_HOST_SUFFIXES - ) - - def _log_response_summary(self, url: str, response: bytes): - status, headers, body = self.fronter._split_raw_response(response) - host = (urlparse(url).hostname or "").lower() - - if status >= 300 or self._should_trace_host(host): - location = headers.get("location", "") or "-" - server = headers.get("server", "") or "-" - cf_ray = headers.get("cf-ray", "") or "-" - content_type = headers.get("content-type", "") or "-" - body_len = len(body) - - body_hint = "-" - rate_limited = False - - # Handle text-like responses (HTML, plain text, JSON…) - if ("text" in content_type.lower() or "json" in content_type.lower()) and body: - sample = body[:1200].decode(errors="replace").lower() - - # --- Structured HTML title extraction --- - if "
{message}
" - return ( - f"HTTP/1.1 {status} Error\r\n" - f"Content-Type: text/html\r\n" - f"Content-Length: {len(body)}\r\n" - f"\r\n" - f"{body}" - ).encode() diff --git a/src/relay/fronting_support.py b/src/relay/fronting_support.py new file mode 100644 index 0000000..fae6837 --- /dev/null +++ b/src/relay/fronting_support.py @@ -0,0 +1,146 @@ +""" +Domain-fronting helper utilities: SNI pool building, range-request validation, +progress formatting, and stream spool read/write helpers. + +Extracted from domain_fronter.py to separate pure helper logic from the +DomainFronter class. +""" + +import re +from dataclasses import dataclass + +from core.constants import FRONT_SNI_POOL_GOOGLE + + +__all__ = [ + "HostStat", + "build_sni_pool", + "parse_content_range", + "validate_range_response", + "format_bytes_human", + "format_elapsed_short", + "render_progress_bar", + "progress_line", + "spool_write", + "spool_read", +] + + +@dataclass +class HostStat: + """Per-host traffic accounting — useful for profiling slow / heavy sites.""" + + requests: int = 0 + cache_hits: int = 0 + bytes: int = 0 + total_latency_ns: int = 0 + errors: int = 0 + + +def build_sni_pool(front_domain: str, overrides: list | None) -> list[str]: + """Build the list of SNIs to rotate through on new outbound TLS handshakes.""" + if overrides: + seen: set[str] = set() + out: list[str] = [] + for item in overrides: + host = str(item).strip().lower().rstrip(".") + if host and host not in seen: + seen.add(host) + out.append(host) + if out: + return out + front_domain = (front_domain or "").lower().rstrip(".") + if front_domain.endswith(".google.com") or front_domain == "google.com": + pool = list(FRONT_SNI_POOL_GOOGLE) + if front_domain and front_domain not in pool: + pool.insert(0, front_domain) + return pool + return [front_domain] if front_domain else ["www.google.com"] + + +def parse_content_range(value: str) -> tuple[int, int, int] | None: + match = re.match(r"^\s*bytes\s+(\d+)-(\d+)/(\d+)\s*$", value or "") + if not match: + return None + start, end, total = (int(group) for group in match.groups()) + if start < 0 or end < start or total <= end: + return None + return start, end, total + + +def validate_range_response( + status: int, + resp_headers: dict, + body: bytes, + start_off: int, + end_off: int, + total_size: int | None = None, +) -> str | None: + if status != 206: + return f"status {status}" + parsed = parse_content_range(resp_headers.get("content-range", "")) + if not parsed: + return "missing/invalid Content-Range" + got_start, got_end, got_total = parsed + if got_start != start_off or got_end != end_off: + return f"Content-Range mismatch {got_start}-{got_end}" + if total_size is not None and got_total != total_size: + return f"Content-Range total mismatch {got_total}/{total_size}" + expected = end_off - start_off + 1 + if len(body) != expected: + return f"short chunk {len(body)}/{expected} B" + return None + + +def format_bytes_human(num_bytes: int) -> str: + value = float(max(0, num_bytes)) + units = ("B", "KiB", "MiB", "GiB", "TiB") + unit = units[0] + for unit in units: + if value < 1024.0 or unit == units[-1]: + break + value /= 1024.0 + if unit == "B": + return f"{int(value)} {unit}" + return f"{value:.1f} {unit}" + + +def format_elapsed_short(seconds: float) -> str: + total = max(0, int(seconds)) + minutes, secs = divmod(total, 60) + hours, minutes = divmod(minutes, 60) + if hours: + return f"{hours:02d}:{minutes:02d}:{secs:02d}" + return f"{minutes:02d}:{secs:02d}" + + +def render_progress_bar(done: int, total: int, width: int = 34) -> str: + if total <= 0: + return "[" + ("-" * width) + "]" + ratio = max(0.0, min(1.0, done / total)) + filled = min(width, int(round(ratio * width))) + return "[" + ("#" * filled) + ("-" * (width - filled)) + "]" + + +def progress_line(*, elapsed: float, done: int, total: int, speed_bytes_per_sec: float) -> str: + return ( + f"[{format_elapsed_short(elapsed)}] " + f"{render_progress_bar(done, total)} " + f"{format_bytes_human(done)} / {format_bytes_human(total)} " + f"({format_bytes_human(int(speed_bytes_per_sec))}/s)" + ) + + +# ── Parallel-range spool helpers ───────────────────────────────────────────── + +def spool_write(file_obj, offset: int, data: bytes) -> None: + """Write *data* at *offset* in a temp file used for parallel-range spooling.""" + file_obj.seek(offset) + file_obj.write(data) + file_obj.flush() + + +def spool_read(file_obj, offset: int, size: int) -> bytes: + """Read *size* bytes from *offset* in a parallel-range spool file.""" + file_obj.seek(offset) + return file_obj.read(size) diff --git a/src/h2_transport.py b/src/relay/h2_transport.py similarity index 99% rename from src/h2_transport.py rename to src/relay/h2_transport.py index b1d4dc3..dfcdbd6 100644 --- a/src/h2_transport.py +++ b/src/relay/h2_transport.py @@ -25,7 +25,7 @@ try: except Exception: # optional dependency fallback certifi = None -import codec +from core import codec log = logging.getLogger("H2") diff --git a/src/relay/http_reader.py b/src/relay/http_reader.py new file mode 100644 index 0000000..6fc9974 --- /dev/null +++ b/src/relay/http_reader.py @@ -0,0 +1,163 @@ +""" +HTTP/1.1 response reader for keep-alive connections. + +Reads exactly one HTTP response from an asyncio StreamReader, handling +chunked transfer-encoding, Content-Length framing, and streaming bodies. +Auto-decompresses the response body according to the Content-Encoding +header (gzip, deflate, brotli, zstd). + +Usage:: + + status, headers, body = await read_http_response(reader, max_bytes=50_000_000) +""" + +from __future__ import annotations + +import asyncio +import re + +from core import codec + +__all__ = ["read_http_response"] + + +async def read_http_response( + reader: asyncio.StreamReader, + *, + max_bytes: int, +) -> tuple[int, dict[str, str], bytes]: + """Read one HTTP/1.1 response. Keep-alive safe (no read-until-EOF). + + Args: + reader: An ``asyncio.StreamReader`` positioned at the start of + an HTTP response. + max_bytes: Hard cap on the decompressed body size. Raises + ``RuntimeError`` if exceeded. + + Returns: + A ``(status_code, headers, body)`` triple. ``status_code`` is 0 + and the other fields are empty/empty if the response is malformed. + """ + # ── Read until header boundary ──────────────────────────────── + raw = b"" + while b"\r\n\r\n" not in raw: + if len(raw) > 65536: # 64 KB header size limit + return 0, {}, b"" + chunk = await asyncio.wait_for(reader.read(8192), timeout=8) + if not chunk: + break + raw += chunk + + if b"\r\n\r\n" not in raw: + return 0, {}, b"" + + header_section, body = raw.split(b"\r\n\r\n", 1) + lines = header_section.split(b"\r\n") + + status_line = lines[0].decode(errors="replace") + m = re.search(r"\d{3}", status_line) + status = int(m.group()) if m else 0 + + headers: dict[str, str] = {} + for line in lines[1:]: + if b":" in line: + k, v = line.decode(errors="replace").split(":", 1) + headers[k.strip().lower()] = v.strip() + + # ── Body framing ────────────────────────────────────────────── + content_length = headers.get("content-length") + transfer_encoding = headers.get("transfer-encoding", "") + + if "chunked" in transfer_encoding: + body = await _read_chunked(reader, body, max_bytes=max_bytes) + elif content_length: + total = int(content_length) + if total > max_bytes: + raise RuntimeError( + "Relay response exceeds configured size cap " + f"({total} > {max_bytes} bytes)" + ) + remaining = total - len(body) + while remaining > 0: + chunk = await asyncio.wait_for( + reader.read(min(remaining, 65536)), timeout=20 + ) + if not chunk: + break + body += chunk + if len(body) > max_bytes: + raise RuntimeError( + "Relay response exceeded configured size cap while reading body" + ) + remaining -= len(chunk) + else: + # No framing — short timeout read (keep-alive safe) + while True: + try: + chunk = await asyncio.wait_for(reader.read(65536), timeout=2) + if not chunk: + break + body += chunk + if len(body) > max_bytes: + raise RuntimeError( + "Relay response exceeded configured size cap while streaming" + ) + except asyncio.TimeoutError: + break + + # ── Auto-decompress ─────────────────────────────────────────── + enc = headers.get("content-encoding", "") + if enc: + body = codec.decode(body, enc) + if len(body) > max_bytes: + raise RuntimeError( + "Decoded relay response exceeded configured size cap" + ) + + return status, headers, body + + +async def _read_chunked( + reader: asyncio.StreamReader, + buf: bytes = b"", + *, + max_bytes: int, +) -> bytes: + """Incrementally read a chunked-transfer-encoded body.""" + result = b"" + while True: + while b"\r\n" not in buf: + data = await asyncio.wait_for(reader.read(8192), timeout=20) + if not data: + return result + buf += data + + end = buf.find(b"\r\n") + size_str = buf[:end].decode(errors="replace").strip() + buf = buf[end + 2:] + + if not size_str: + continue + try: + size = int(size_str, 16) + except ValueError: + break + if size == 0: + break + if size > max_bytes or len(result) + size > max_bytes: + raise RuntimeError( + "Chunked relay response exceeded configured size cap " + f"({max_bytes} bytes)" + ) + + while len(buf) < size + 2: + data = await asyncio.wait_for(reader.read(65536), timeout=20) + if not data: + result += buf[:size] + return result + buf += data + + result += buf[:size] + buf = buf[size + 2:] + + return result diff --git a/src/relay/relay_response.py b/src/relay/relay_response.py new file mode 100644 index 0000000..6caedcb --- /dev/null +++ b/src/relay/relay_response.py @@ -0,0 +1,323 @@ +""" +Apps Script relay response parsing. + +Pure functions for decoding the JSON envelope returned by Code.gs and +reconstructing a standard HTTP response that the proxy can forward to +the client browser. + +Public API +---------- +parse_relay_response(body, max_body_bytes) -> bytes + Top-level entry point: bytes → raw HTTP response bytes. + +split_raw_response(raw) -> (status, headers, body) + Parse a raw HTTP byte string into its parts. + +error_response(status, message) -> bytes + Build a minimal HTML error response. + +classify_relay_error(raw) -> str + Map a raw Apps Script error string to a human-readable explanation. +""" + +import base64 +import codecs +import json +import logging +import re + +log = logging.getLogger("Fronter") + +__all__ = [ + "classify_relay_error", + "error_response", + "split_raw_response", + "split_set_cookie", + "parse_relay_json", + "extract_apps_script_user_html", + "load_relay_json", + "parse_relay_response", +] + + +# ── Apps Script error pattern tables ───────────────────────────────────────── +# Matched against the lower-cased ``e`` field returned by Code.gs. +# Sources: +# • https://developers.google.com/apps-script/guides/support/troubleshooting +# • https://developers.google.com/apps-script/guides/services/quotas + +# "Service invoked too many times for one day: urlfetch." +# "Bandwidth quota exceeded" +_QUOTA_PATTERNS = ( + "service invoked too many times", + "invoked too many times", + "bandwidth quota exceeded", + "too much upload bandwidth", + "too much traffic", + "urlfetch", # appears at end of the daily-quota message in all locales + "quota", + "exceeded", + "daily", + "rate limit", +) + +# "Authorization is required to perform that action." +# "unauthorized" (our own Code.gs response) +_AUTH_PATTERNS = ( + "authorization is required", + "unauthorized", + "not authorized", + "permission denied", + "access denied", +) + +# "Error occurred due to a missing library version or a deployment version. +# Error code Not_Found" +# "script id not found" / wrong Deployment ID +_DEPLOY_PATTERNS = ( + "error code not_found", + "not_found", + "deployment", + "script id", + "scriptid", + "no script", +) + +# "Server not available." / "Server error occurred, please try again." +_TRANSIENT_PATTERNS = ( + "server not available", + "server error occurred", + "please try again", + "temporarily unavailable", +) + +# "UrlFetch calls to{message}
" + return ( + f"HTTP/1.1 {status} Error\r\n" + f"Content-Type: text/html\r\n" + f"Content-Length: {len(body)}\r\n" + f"\r\n" + f"{body}" + ).encode() + + +def split_raw_response(raw: bytes): + """Split a raw HTTP response into ``(status, headers_dict, body)``.""" + if b"\r\n\r\n" not in raw: + return 0, {}, raw + header_section, body = raw.split(b"\r\n\r\n", 1) + lines = header_section.split(b"\r\n") + m = re.search(r"\d{3}", lines[0].decode(errors="replace")) + status = int(m.group()) if m else 0 + headers: dict[str, str] = {} + for line in lines[1:]: + if b":" in line: + k, v = line.decode(errors="replace").split(":", 1) + headers[k.strip().lower()] = v.strip() + return status, headers, body + + +def split_set_cookie(blob: str) -> list[str]: + """Split a Set-Cookie string that may contain multiple cookies. + + Apps Script sometimes joins multiple Set-Cookie values with ", ", + which collides with the comma that legitimately appears inside the + ``Expires`` attribute (e.g. "Expires=Wed, 21 Oct 2026 ..."). We split + only on commas that are immediately followed by a cookie name=value + pair, leaving date commas intact. + """ + if not blob: + return [] + parts = re.split(r",\s*(?=[A-Za-z0-9!#$%&'*+\-.^_`|~]+=)", blob) + return [p.strip() for p in parts if p.strip()] + + +# ── JSON → HTTP response ───────────────────────────────────────────────────── + +def parse_relay_json(data: dict, max_body_bytes: int) -> bytes: + """Convert a parsed relay JSON dict to raw HTTP response bytes.""" + if "e" in data: + raw_err = str(data["e"]) + friendly = classify_relay_error(raw_err) + log.warning("Apps Script error — %s | raw: %s", friendly.split(".")[0], raw_err) + return error_response(502, friendly) + + status = data.get("s", 200) + resp_headers = data.get("h", {}) + resp_body = base64.b64decode(data.get("b", "")) + if len(resp_body) > max_body_bytes: + return error_response( + 502, + f"Relay response exceeds cap ({max_body_bytes} bytes). " + "Increase max_response_body_bytes if your system has enough RAM.", + ) + + status_text = { + 200: "OK", 206: "Partial Content", + 301: "Moved", 302: "Found", 304: "Not Modified", + 400: "Bad Request", 403: "Forbidden", 404: "Not Found", + 500: "Internal Server Error", + }.get(status, "OK") + result = f"HTTP/1.1 {status} {status_text}\r\n" + + skip = {"transfer-encoding", "connection", "keep-alive", + "content-length", "content-encoding"} + for k, v in resp_headers.items(): + if k.lower() in skip: + continue + # Apps Script returns multi-valued headers (e.g. Set-Cookie) as a + # JavaScript array. Emit each value as its own header line. + # A single string that holds multiple Set-Cookie values joined + # with ", " also needs to be split, otherwise the browser sees + # one malformed cookie and sites like x.com fail. + values = v if isinstance(v, list) else [v] + if k.lower() == "set-cookie": + expanded: list[str] = [] + for item in values: + expanded.extend(split_set_cookie(str(item))) + values = expanded + for val in values: + result += f"{k}: {val}\r\n" + result += f"Content-Length: {len(resp_body)}\r\n" + result += "\r\n" + return result.encode() + resp_body + + +def extract_apps_script_user_html(text: str) -> str | None: + """Extract embedded user HTML from an Apps Script HTML-page response.""" + marker = 'goog.script.init("' + start = text.find(marker) + if start == -1: + return None + start += len(marker) + end = text.find('", "", undefined', start) + if end == -1: + return None + + encoded = text[start:end] + try: + decoded = codecs.decode(encoded, "unicode_escape") + payload = json.loads(decoded) + except Exception: + return None + + user_html = payload.get("userHtml") + return user_html if isinstance(user_html, str) else None + + +def load_relay_json(text: str) -> dict | None: + """Parse a relay JSON body, handling Apps Script HTML wrappers.""" + try: + return json.loads(text) + except json.JSONDecodeError: + wrapped = extract_apps_script_user_html(text) + if wrapped: + data = load_relay_json(wrapped) + if data is not None: + return data + + match = re.search(r'\{.*\}', text, re.DOTALL) + if not match: + return None + try: + data = json.loads(match.group()) + except json.JSONDecodeError: + return None + return data if isinstance(data, dict) else None + + +def parse_relay_response(body: bytes, max_body_bytes: int) -> bytes: + """Parse a raw Apps Script response body into a raw HTTP response. + + ``body`` is the bytes returned over the TLS connection after stripping + the outer HTTP/1.1 response headers. The function: + + 1. Decodes the JSON envelope produced by Code.gs. + 2. Unpacks the nested status / headers / base64-body fields. + 3. Reconstructs a well-formed HTTP/1.1 response suitable for + forwarding directly to the browser. + """ + text = body.decode(errors="replace").strip() + if not text: + return error_response(502, "Empty response from relay") + + data = load_relay_json(text) + if data is None: + return error_response(502, f"No JSON: {text[:200]}") + + return parse_relay_json(data, max_body_bytes)