commit df3c2c5c167a10976861de9e7016a3b6bfddbcd6 Author: Amin.MasterkinG Date: Mon Apr 20 18:44:53 2026 +0330 First commit! (TESTING) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..20bfc3f --- /dev/null +++ b/.gitignore @@ -0,0 +1,35 @@ +# Secrets & user config +config.json +.env + +# CA certificates (generated at runtime, contain private keys) +ca/ + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +*.egg-info/ +dist/ +build/ +*.egg + +# Virtual environments +venv/ +.venv/ +env/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Temp MITM certs +domainfront_certs_*/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..1772b14 --- /dev/null +++ b/README.md @@ -0,0 +1,222 @@ +# DomainFront Tunnel (MasterHttpRelayVPN Testing Python!) + +A local HTTP proxy that bypasses DPI (Deep Packet Inspection) censorship using **domain fronting**. The proxy tunnels all browser traffic through a CDN — the TLS SNI shows an allowed domain (e.g. `www.google.com`) while the encrypted HTTP Host header routes to your relay endpoint. + +## How It Works + +``` +Browser ──► Local Proxy ──► CDN (TLS SNI: www.google.com) ──► Your Relay ──► Target Website + DPI sees: "www.google.com" ✓ + Actual destination: hidden +``` + +The DPI/firewall only sees the SNI field in the TLS handshake, which shows an innocuous, unblockable domain. The real destination is hidden inside the encrypted HTTP stream. + +## Supported Modes + +| Mode | Relay | Description | +|------|-------|-------------| +| `apps_script` | Google Apps Script | Fronts through `www.google.com` → `script.google.com`. Free, no server needed. | +| `google_fronting` | Google Cloud Run | Fronts through Google IP → your Cloud Run service. | +| `domain_fronting` | Cloudflare Worker | Classic domain fronting via Cloudflare CDN. | +| `custom_domain` | Custom domain on CF | Direct connection to your custom domain on Cloudflare. | + +## Quick Start + +### 1. Install + +```bash +# Clone the repository +git clone https://github.com/masterking32/MasterHttpRelayVPN.git +cd domainfront-tunnel + +# (Optional) Create a virtual environment +python -m venv venv +source venv/bin/activate # Linux/macOS +venv\Scripts\activate # Windows + +# Install dependencies +pip install -r requirements.txt +``` + +> **Note:** Python 3.10+ is required. Core functionality has no external dependencies. The optional packages (`cryptography`, `h2`) enable MITM interception and HTTP/2 multiplexing for the `apps_script` mode. + +### 2. Configure + +```bash +cp config.example.json config.json +``` + +Edit `config.json` with your values: + +```json +{ + "mode": "apps_script", + "google_ip": "216.239.38.120", + "front_domain": "www.google.com", + "script_id": "YOUR_APPS_SCRIPT_DEPLOYMENT_ID", + "auth_key": "your-strong-secret-key", + "listen_host": "127.0.0.1", + "listen_port": 8085, + "log_level": "INFO", + "verify_ssl": true +} +``` + +### 3. Run + +```bash +python main.py +``` + +### 4. Configure Your Browser + +Set your browser's HTTP proxy to `127.0.0.1:8085` (or whatever `listen_host`:`listen_port` you configured). + +For `apps_script` mode, you also need to install the generated CA certificate (`ca/ca.crt`) in your browser's trusted root CAs. + +## Configuration Reference + +### Required Fields + +| Field | Description | +|-------|-------------| +| `mode` | One of: `apps_script`, `google_fronting`, `domain_fronting`, `custom_domain` | +| `auth_key` | Shared secret between the proxy and your relay endpoint | + +### Mode-Specific Fields + +| Field | Modes | Description | +|-------|-------|-------------| +| `script_id` | `apps_script` | Your deployed Apps Script ID (or array of IDs for load balancing) | +| `worker_host` | `domain_fronting`, `google_fronting` | Your Worker/Cloud Run hostname | +| `custom_domain` | `custom_domain` | Your custom domain on Cloudflare | +| `front_domain` | `domain_fronting`, `google_fronting`, `apps_script` | The domain shown in TLS SNI (default: `www.google.com`) | +| `google_ip` | `google_fronting`, `apps_script` | Google IP to connect to (default: `216.239.38.120`) | + +### Optional Fields + +| Field | Default | Description | +|-------|---------|-------------| +| `listen_host` | `127.0.0.1` | Local proxy bind address | +| `listen_port` | `8080` | Local proxy port | +| `log_level` | `INFO` | Logging level: `DEBUG`, `INFO`, `WARNING`, `ERROR` | +| `verify_ssl` | `true` | Verify TLS certificates | +| `worker_path` | `""` | URL path prefix for the worker | +| `script_ids` | — | Array of Apps Script IDs for round-robin load balancing | + +## Environment Variables + +All settings can be overridden via environment variables (useful for containers/CI): + +| Variable | Overrides | +|----------|-----------| +| `DFT_CONFIG` | Config file path | +| `DFT_AUTH_KEY` | `auth_key` | +| `DFT_SCRIPT_ID` | `script_id` | +| `DFT_PORT` | `listen_port` | +| `DFT_HOST` | `listen_host` | +| `DFT_LOG_LEVEL` | `log_level` | + +## CLI Usage + +``` +usage: domainfront-tunnel [-h] [-c CONFIG] [-p PORT] [--host HOST] + [--log-level {DEBUG,INFO,WARNING,ERROR}] [-v] + +options: + -c, --config CONFIG Path to config file (default: config.json) + -p, --port PORT Override listen port + --host HOST Override listen host + --log-level LEVEL Override log level + -v, --version Show version and exit +``` + +### Examples + +```bash +# Basic usage +python main.py + +# Custom config file +python main.py -c /path/to/my-config.json + +# Override port +python main.py -p 9090 + +# Debug logging +python main.py --log-level DEBUG + +# Using environment variables +DFT_AUTH_KEY=my-secret DFT_PORT=9090 python main.py +``` + +## Apps Script Setup + +1. Go to [Google Apps Script](https://script.google.com/) and create a new project. +2. Paste your relay script code into `Code.gs`. +3. Deploy as a **Web App**: + - Execute as: **Me** + - Who has access: **Anyone** +4. Copy the **Deployment ID** and paste it into `config.json` as `script_id`. +5. Set a strong `auth_key` in both the Apps Script and `config.json`. + +### Multiple Script IDs (Load Balancing) + +For higher throughput, deploy multiple copies and use an array: + +```json +{ + "script_ids": [ + "DEPLOYMENT_ID_1", + "DEPLOYMENT_ID_2", + "DEPLOYMENT_ID_3" + ] +} +``` + +## Architecture + +``` +┌─────────┐ ┌──────────────┐ ┌─────────────┐ ┌──────────┐ +│ Browser │────►│ Local Proxy │────►│ CDN / Google │────►│ Relay │──► Internet +│ │◄────│ (this tool) │◄────│ (fronted) │◄────│ Endpoint │◄── +└─────────┘ └──────────────┘ └─────────────┘ └──────────┘ + HTTP/CONNECT TLS (SNI: ok) Fetch target + MITM (optional) Host: relay Return response +``` + +### Key Components + +| File | Purpose | +|------|---------| +| `main.py` | Entry point, config loading, CLI | +| `proxy_server.py` | Local HTTP/CONNECT proxy server | +| `domain_fronter.py` | Domain fronting engine, connection pooling, relay logic | +| `h2_transport.py` | HTTP/2 multiplexed transport (optional, for performance) | +| `mitm.py` | MITM certificate manager for HTTPS interception | +| `ws.py` | WebSocket frame encoder/decoder (RFC 6455) | + +## Performance Features + +- **HTTP/2 multiplexing**: Single TLS connection handles 100+ concurrent requests +- **Connection pooling**: Pre-warmed TLS connection pool with automatic maintenance +- **Request batching**: Groups concurrent requests into single relay calls +- **Request coalescing**: Deduplicates identical concurrent GET requests +- **Parallel range downloads**: Splits large downloads into concurrent chunks +- **Response caching**: LRU cache for static assets (configurable, 50 MB default) + +## Security Notes + +- **Never commit `config.json`** — it contains your `auth_key`. The `.gitignore` excludes it. +- The `ca/` directory contains your generated CA private key. Keep it secure. +- Use a strong, unique `auth_key` to prevent unauthorized use of your relay. +- Set `listen_host` to `127.0.0.1` (not `0.0.0.0`) unless you need LAN access. + +## Special Thanks + +Special thanks to [@abolix](https://github.com/abolix) for making this project possible. + +## License + +MIT diff --git a/config.example.json b/config.example.json new file mode 100644 index 0000000..b02faa6 --- /dev/null +++ b/config.example.json @@ -0,0 +1,12 @@ +{ + "_comment": "Copy this file to config.json and fill in your values", + "mode": "apps_script", + "google_ip": "216.239.38.120", + "front_domain": "www.google.com", + "script_id": "YOUR_APPS_SCRIPT_DEPLOYMENT_ID", + "auth_key": "CHANGE_ME_TO_A_STRONG_SECRET", + "listen_host": "127.0.0.1", + "listen_port": 8085, + "log_level": "INFO", + "verify_ssl": true +} diff --git a/domain_fronter.py b/domain_fronter.py new file mode 100644 index 0000000..0d98768 --- /dev/null +++ b/domain_fronter.py @@ -0,0 +1,1159 @@ +""" +CDN Relay engine. + +Modes: + 1. custom_domain — SNI and Host both point to your custom domain on CF. + 2. domain_fronting — SNI = front_domain (allowed), Host = worker_host. + 3. google_fronting — Connect to Google IP, SNI=google, Host=Cloud Run. + 4. apps_script — Domain fronting via Google Apps Script relay. + POST JSON to script.google.com (fronted through www.google.com). + Apps Script fetches the target URL and returns the response. + +Modes 1-3: + tunnel() — WebSocket-based TCP tunnel (HTTPS / any TCP) + forward() — HTTP request forwarding (plain HTTP) + +Mode 4 (apps_script): + relay() — JSON-based HTTP relay through Apps Script +""" + +import asyncio +import base64 +import gzip +import json +import logging +import os +import re +import ssl +import time +from urllib.parse import urlparse + +from ws import ws_encode, ws_decode + +log = logging.getLogger("Fronter") + + +class DomainFronter: + def __init__(self, config: dict): + mode = config.get("mode", "domain_fronting") + + if mode == "custom_domain": + domain = config["custom_domain"] + self.connect_host = domain + self.sni_host = domain + self.http_host = domain + elif mode == "google_fronting": + self.connect_host = config.get("google_ip", "216.239.38.120") + self.sni_host = config.get("front_domain", "www.google.com") + self.http_host = config["worker_host"] + elif mode == "apps_script": + self.connect_host = config.get("google_ip", "216.239.38.120") + self.sni_host = config.get("front_domain", "www.google.com") + self.http_host = "script.google.com" + # Multi-script round-robin for higher throughput + script = config.get("script_ids") or config.get("script_id") + self._script_ids = script if isinstance(script, list) else [script] + self._script_idx = 0 + self.script_id = self._script_ids[0] # backward compat / logging + self._dev_available = False # True if /dev endpoint works (no redirect, ~400ms faster) + else: + self.connect_host = config["front_domain"] + self.sni_host = config["front_domain"] + self.http_host = config["worker_host"] + + self.mode = mode + self.worker_path = config.get("worker_path", "") + self.auth_key = config.get("auth_key", "") + self.verify_ssl = config.get("verify_ssl", True) + + # Connection pool — TTL-based, pre-warmed, with concurrency control + self._pool: list[tuple[asyncio.StreamReader, asyncio.StreamWriter, float]] = [] + self._pool_lock = asyncio.Lock() + self._pool_max = 50 + self._conn_ttl = 45.0 # seconds before a pooled conn is discarded + self._semaphore = asyncio.Semaphore(50) # max concurrent relay connections + self._warmed = False + self._refilling = False # background pool refill in progress + self._pool_min_idle = 15 # maintain at least this many idle connections + self._maintenance_task: asyncio.Task | None = None + + # Batch collector for grouping concurrent relay() calls + self._batch_lock = asyncio.Lock() + self._batch_pending: list[tuple[dict, asyncio.Future]] = [] + self._batch_task: asyncio.Task | None = None + self._batch_window_micro = 0.005 # 5ms micro-window (single req) + self._batch_window_macro = 0.050 # 50ms macro-window (burst traffic) + self._batch_max = 50 # max requests per batch + self._batch_enabled = True # disabled on first batch API failure + + # Request coalescing — dedup concurrent identical GETs + self._coalesce: dict[str, list[asyncio.Future]] = {} + + # HTTP/2 multiplexing — one connection handles all requests + self._h2 = None + if mode == "apps_script": + try: + from h2_transport import H2Transport, H2_AVAILABLE + if H2_AVAILABLE: + self._h2 = H2Transport( + self.connect_host, self.sni_host, self.verify_ssl + ) + log.info("HTTP/2 multiplexing available — " + "all requests will share one connection") + except ImportError: + pass + + # ── helpers ─────────────────────────────────────────────────── + + def _ssl_ctx(self) -> ssl.SSLContext: + ctx = ssl.create_default_context() + if not self.verify_ssl: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return ctx + + async def _open(self): + """Open a TLS connection to the CDN. + + The *server_hostname* parameter sets the **TLS SNI** extension. + DPI systems see only this value. + """ + return await asyncio.open_connection( + self.connect_host, + 443, + ssl=self._ssl_ctx(), + server_hostname=self.sni_host, + ) + + async def _acquire(self): + """Get a healthy TLS connection from pool (TTL-checked) or open new.""" + now = asyncio.get_event_loop().time() + async with self._pool_lock: + while self._pool: + reader, writer, created = self._pool.pop() + if (now - created) < self._conn_ttl and not reader.at_eof(): + # Eagerly replace the connection we just took + asyncio.create_task(self._add_conn_to_pool()) + return reader, writer, created + try: + writer.close() + except Exception: + pass + reader, writer = await asyncio.wait_for(self._open(), timeout=10) + # Pool was empty — trigger aggressive background refill + if not self._refilling: + self._refilling = True + asyncio.create_task(self._refill_pool()) + return reader, writer, asyncio.get_event_loop().time() + + async def _release(self, reader, writer, created): + """Return a connection to the pool if still young and healthy.""" + now = asyncio.get_event_loop().time() + if (now - created) >= self._conn_ttl or reader.at_eof(): + try: + writer.close() + except Exception: + pass + return + async with self._pool_lock: + if len(self._pool) < self._pool_max: + self._pool.append((reader, writer, created)) + else: + try: + writer.close() + except Exception: + pass + + def _next_script_id(self) -> str: + """Round-robin across script IDs for load distribution.""" + sid = self._script_ids[self._script_idx % len(self._script_ids)] + self._script_idx += 1 + return sid + + def _exec_path(self) -> str: + """Get the next Apps Script endpoint path (/dev or /exec).""" + sid = self._next_script_id() + return f"/macros/s/{sid}/{'dev' if self._dev_available else 'exec'}" + + async def _flush_pool(self): + """Close all pooled connections (they may be stale after errors).""" + async with self._pool_lock: + for _, writer, _ in self._pool: + try: + writer.close() + except Exception: + pass + self._pool.clear() + + async def _refill_pool(self): + """Background: open connections in parallel to refill empty pool.""" + try: + coros = [self._add_conn_to_pool() for _ in range(8)] + await asyncio.gather(*coros, return_exceptions=True) + finally: + self._refilling = False + + async def _add_conn_to_pool(self): + """Open one TLS connection and add it to the pool.""" + try: + r, w = await asyncio.wait_for(self._open(), timeout=5) + t = asyncio.get_event_loop().time() + async with self._pool_lock: + if len(self._pool) < self._pool_max: + self._pool.append((r, w, t)) + else: + try: + w.close() + except Exception: + pass + except Exception: + pass + + async def _pool_maintenance(self): + """Continuously maintain healthy pool levels in background.""" + while True: + try: + await asyncio.sleep(3) + now = asyncio.get_event_loop().time() + + # Purge expired / dead connections + async with self._pool_lock: + alive = [] + for r, w, t in self._pool: + if (now - t) < self._conn_ttl and not r.at_eof(): + alive.append((r, w, t)) + else: + try: + w.close() + except Exception: + pass + self._pool = alive + idle = len(self._pool) + + # Refill if below minimum idle threshold + needed = max(0, self._pool_min_idle - idle) + if needed > 0: + coros = [self._add_conn_to_pool() + for _ in range(min(needed, 5))] + await asyncio.gather(*coros, return_exceptions=True) + + except asyncio.CancelledError: + break + except Exception: + pass + + async def _warm_pool(self): + """Pre-open TLS connections in the background. Never blocks relay().""" + if self._warmed: + return + self._warmed = True + asyncio.create_task(self._do_warm()) + # Start continuous pool maintenance + if self._maintenance_task is None: + self._maintenance_task = asyncio.create_task(self._pool_maintenance()) + # Start H2 connection (runs alongside H1 pool) + if self._h2: + asyncio.create_task(self._h2_connect_and_warm()) + + async def _h2_connect(self): + """Connect the HTTP/2 transport in background.""" + try: + await self._h2.ensure_connected() + log.info("H2 multiplexing active — one conn handles all requests") + except Exception as e: + log.warning("H2 connect failed (%s), using H1 pool fallback", e) + + async def _h2_connect_and_warm(self): + """Connect H2, pre-warm the Apps Script container, start keepalive.""" + await self._h2_connect() + if self._h2 and self._h2.is_connected: + asyncio.create_task(self._prewarm_script()) + asyncio.create_task(self._keepalive_loop()) + + async def _prewarm_script(self): + """Pre-warm Apps Script and detect /dev fast path (no redirect).""" + payload = json.dumps( + {"m": "HEAD", "u": "http://example.com/", "k": self.auth_key} + ).encode() + hdrs = {"content-type": "application/json"} + sid = self._script_ids[0] + + # Test /dev endpoint — returns data inline (no 302 redirect). + # If it works, saves ~400ms per request by eliminating one round trip. + try: + dev_path = f"/macros/s/{sid}/dev" + t0 = time.perf_counter() + status, _, body = await asyncio.wait_for( + self._h2.request( + method="POST", path=dev_path, host=self.http_host, + headers=hdrs, body=payload, + ), + timeout=15, + ) + dt = (time.perf_counter() - t0) * 1000 + data = json.loads(body.decode(errors="replace")) + if "s" in data: + self._dev_available = True + log.info("/dev fast path active (%.0fms, no redirect)", dt) + return + except Exception as e: + log.debug("/dev test failed: %s", e) + + # Fallback: warm up with /exec + try: + exec_path = f"/macros/s/{sid}/exec" + t0 = time.perf_counter() + await asyncio.wait_for( + self._h2.request( + method="POST", path=exec_path, host=self.http_host, + headers=hdrs, body=payload, + ), + timeout=15, + ) + dt = (time.perf_counter() - t0) * 1000 + log.info("Apps Script pre-warmed in %.0fms", dt) + except Exception as e: + log.debug("Pre-warm failed: %s", e) + + async def _keepalive_loop(self): + """Send periodic pings to keep Apps Script warm + H2 connection alive.""" + while True: + try: + await asyncio.sleep(180) # 3 minutes (ahead of Google's ~4min timeout) + if not self._h2 or not self._h2.is_connected: + try: + await self._h2.reconnect() + except Exception: + continue + + # H2 PING to keep connection alive + await self._h2.ping() + + # Apps Script keepalive — warm the container + payload = {"m": "HEAD", "u": "http://example.com/", "k": self.auth_key} + path = self._exec_path() + t0 = time.perf_counter() + await asyncio.wait_for( + self._h2.request( + method="POST", path=path, host=self.http_host, + headers={"content-type": "application/json"}, + body=json.dumps(payload).encode(), + ), + timeout=20, + ) + dt = (time.perf_counter() - t0) * 1000 + log.debug("Keepalive ping: %.0fms", dt) + except asyncio.CancelledError: + break + except Exception as e: + log.debug("Keepalive failed: %s", e) + + async def _do_warm(self): + """Open connections in parallel — failures are fine.""" + count = 30 + coros = [self._add_conn_to_pool() for _ in range(count)] + results = await asyncio.gather(*coros, return_exceptions=True) + opened = sum(1 for r in results if not isinstance(r, Exception)) + log.info("Pre-warmed %d/%d TLS connections", opened, count) + + def _auth_header(self) -> str: + return f"X-Auth-Key: {self.auth_key}\r\n" if self.auth_key else "" + + # ── WebSocket tunnel (CONNECT / HTTPS) ──────────────────────── + + async def tunnel(self, target_host: str, target_port: int, + client_r: asyncio.StreamReader, + client_w: asyncio.StreamWriter): + """Tunnel raw TCP bytes through a domain-fronted WebSocket.""" + try: + remote_r, remote_w = await self._open() + except Exception as e: + log.error("TLS connect to %s failed: %s", self.connect_host, e) + return + + try: + # ---- WebSocket upgrade ---- + ws_key = base64.b64encode(os.urandom(16)).decode() + path = f"{self.worker_path}/tunnel?host={target_host}&port={target_port}" + handshake = ( + f"GET {path} HTTP/1.1\r\n" + f"Host: {self.http_host}\r\n" + f"Upgrade: websocket\r\n" + f"Connection: Upgrade\r\n" + f"Sec-WebSocket-Key: {ws_key}\r\n" + f"Sec-WebSocket-Version: 13\r\n" + f"{self._auth_header()}" + f"\r\n" + ) + remote_w.write(handshake.encode()) + await remote_w.drain() + + # Read the 101 Switching Protocols response + resp = b"" + while b"\r\n\r\n" not in resp: + chunk = await asyncio.wait_for(remote_r.read(4096), timeout=15) + if not chunk: + raise ConnectionError("No WebSocket handshake response") + resp += chunk + + status_line = resp.split(b"\r\n")[0] + if b"101" not in status_line: + raise ConnectionError( + f"WebSocket upgrade rejected: {status_line.decode(errors='replace')}" + ) + + log.info("Tunnel ready → %s:%d", target_host, target_port) + + # ---- bidirectional relay ---- + await asyncio.gather( + self._client_to_ws(client_r, remote_w), + self._ws_to_client(remote_r, client_w), + ) + + except Exception as e: + log.error("Tunnel error (%s:%d): %s", target_host, target_port, e) + finally: + try: + remote_w.close() + except Exception: + pass + + async def _client_to_ws(self, src: asyncio.StreamReader, + dst: asyncio.StreamWriter): + """Read plaintext from the browser, wrap in WS frames, send to CDN.""" + try: + while True: + data = await src.read(16384) + if not data: + # Send a WS close frame + dst.write(ws_encode(b"", opcode=0x08)) + await dst.drain() + break + dst.write(ws_encode(data)) + await dst.drain() + except (ConnectionError, asyncio.CancelledError): + pass + + async def _ws_to_client(self, src: asyncio.StreamReader, + dst: asyncio.StreamWriter): + """Read WS frames from CDN, unwrap, write plaintext to browser.""" + buf = b"" + try: + while True: + chunk = await src.read(16384) + if not chunk: + break + buf += chunk + while buf: + result = ws_decode(buf) + if result is None: + break # need more data + opcode, payload, consumed = result + buf = buf[consumed:] + if opcode == 0x08: # close + return + if payload: + dst.write(payload) + await dst.drain() + except (ConnectionError, asyncio.CancelledError): + pass + + # ── HTTP forwarding ─────────────────────────────────────────── + + async def forward(self, raw_request: bytes) -> bytes: + """Forward a plain HTTP request through the domain-fronted channel. + + Uses keep-alive connections from the pool for efficiency. + """ + try: + reader, writer, created = await self._acquire() + + # Wrap the original HTTP request inside a POST to the worker. + request = ( + f"POST {self.worker_path}/forward HTTP/1.1\r\n" + f"Host: {self.http_host}\r\n" + f"Content-Type: application/octet-stream\r\n" + f"Content-Length: {len(raw_request)}\r\n" + f"Connection: keep-alive\r\n" + f"{self._auth_header()}" + f"\r\n" + ) + writer.write(request.encode() + raw_request) + await writer.drain() + + status, resp_headers, resp_body = await self._read_http_response(reader) + + await self._release(reader, writer, created) + + # The worker wraps the target's response in its own HTTP + # envelope. The body IS the raw HTTP response from the target. + return resp_body + + except Exception as e: + log.error("Forward failed: %s", e) + return b"HTTP/1.1 502 Bad Gateway\r\n\r\nDomain fronting request failed\r\n" + + # ── Apps Script relay (apps_script mode) ────────────────────── + + async def relay(self, method: str, url: str, + headers: dict, body: bytes = b"") -> bytes: + """Relay an HTTP request through Apps Script. + + Features: + - Pre-warms TLS connections on first call + - Coalesces concurrent identical GET requests + - Batches concurrent calls via fetchAll() (40ms window) + - Retries once on connection failure + - Concurrency-limited via semaphore + + Returns a raw HTTP response (status + headers + body). + """ + if not self._warmed: + await self._warm_pool() + + payload = self._build_payload(method, url, headers, body) + + # Coalesce concurrent GETs for the same URL. + # CRITICAL: do NOT coalesce when a Range header is present — + # parallel range downloads MUST each hit the server independently. + has_range = False + if headers: + for k in headers: + if k.lower() == "range": + has_range = True + break + if method == "GET" and not body and not has_range: + return await self._coalesced_submit(url, payload) + + return await self._batch_submit(payload) + + async def _coalesced_submit(self, url: str, payload: dict) -> bytes: + """Dedup concurrent requests for the same URL (no Range header).""" + if url in self._coalesce: + # Another task is already fetching this URL — wait for it + future = asyncio.get_event_loop().create_future() + self._coalesce[url].append(future) + log.debug("Coalesced request: %s", url[:60]) + return await future + + self._coalesce[url] = [] + try: + result = await self._batch_submit(payload) + # Resolve all waiters + for f in self._coalesce.get(url, []): + if not f.done(): + f.set_result(result) + return result + except Exception as e: + for f in self._coalesce.get(url, []): + if not f.done(): + f.set_exception(e) + raise + finally: + self._coalesce.pop(url, None) + + async def relay_parallel(self, method: str, url: str, + headers: dict, body: bytes = b"", + chunk_size: int = 256 * 1024, + max_parallel: int = 16) -> bytes: + """Relay with parallel range acceleration for large downloads. + + Strategy: + 1. Send initial GET with Range: bytes=0- + 2. If target returns 206 (supports ranges), fetch remaining + chunks concurrently via HTTP/2 multiplexing. + 3. If target returns 200 (no range support) or small file, + return the single response. + + Since each Apps Script call takes ~2s regardless of payload size, + we use: + - 256 KB chunks (safe under Apps Script response limit) + - Up to 16 chunks in flight at once via H2 multiplexing + - Aggregate throughput of ~2 MB per round-trip (~2-3s) + """ + if method != "GET" or body: + return await self.relay(method, url, headers, body) + + # Probe: first chunk with Range header + range_headers = dict(headers) if headers else {} + range_headers["Range"] = f"bytes=0-{chunk_size - 1}" + first_resp = await self.relay("GET", url, range_headers, b"") + + status, resp_hdrs, resp_body = self._split_raw_response(first_resp) + + # No range support → return the single response as-is + if status != 206: + return first_resp + + # Parse total size from Content-Range: "bytes 0-262143/1048576" + content_range = resp_hdrs.get("content-range", "") + m = re.search(r"/(\d+)", content_range) + if not m: + return first_resp + total_size = int(m.group(1)) + + # Small file: probe already fetched it all + if total_size <= chunk_size or len(resp_body) >= total_size: + return first_resp + + # Calculate remaining ranges + ranges = [] + start = len(resp_body) + while start < total_size: + end = min(start + chunk_size - 1, total_size - 1) + ranges.append((start, end)) + start = end + 1 + + log.info("Parallel download: %d bytes, %d chunks of %d KB", + total_size, len(ranges) + 1, chunk_size // 1024) + + # Concurrency-limited parallel fetch + sem = asyncio.Semaphore(max_parallel) + + async def fetch_range(s, e, max_tries: int = 3): + async with sem: + rh_base = dict(headers) if headers else {} + rh_base["Range"] = f"bytes={s}-{e}" + expected = e - s + 1 + last_err = None + for attempt in range(max_tries): + try: + raw = await self.relay("GET", url, rh_base, b"") + _, _, chunk_body = self._split_raw_response(raw) + if len(chunk_body) == expected: + return chunk_body + last_err = ( + f"short chunk {len(chunk_body)}/{expected} B" + ) + except Exception as e_: + last_err = repr(e_) + log.warning("Range %d-%d retry %d/%d: %s", + s, e, attempt + 1, max_tries, last_err) + await asyncio.sleep(0.3 * (attempt + 1)) + raise RuntimeError( + f"chunk {s}-{e} failed after {max_tries} tries: {last_err}" + ) + + t0 = asyncio.get_event_loop().time() + results = await asyncio.gather( + *[fetch_range(s, e) for s, e in ranges], + return_exceptions=True, + ) + elapsed = asyncio.get_event_loop().time() - t0 + + # Assemble full body + parts = [resp_body] + for i, r in enumerate(results): + if isinstance(r, Exception): + log.error("Range chunk %d failed: %s", i, r) + return self._error_response(502, f"Parallel download failed: {r}") + parts.append(r) + + full_body = b"".join(parts) + kbs = (len(full_body) / 1024) / elapsed if elapsed > 0 else 0 + log.info("Parallel download complete: %d B in %.2fs = %.1f KB/s", + len(full_body), elapsed, kbs) + + # Return as 200 OK (client sent a normal GET) + result = f"HTTP/1.1 200 OK\r\n" + skip = {"transfer-encoding", "connection", "keep-alive", + "content-length", "content-encoding", "content-range"} + for k, v in resp_hdrs.items(): + if k.lower() not in skip: + result += f"{k}: {v}\r\n" + result += f"Content-Length: {len(full_body)}\r\n" + result += "\r\n" + return result.encode() + full_body + + def _build_payload(self, method, url, headers, body): + """Build the JSON relay payload dict.""" + payload = { + "m": method, + "u": url, + "r": True, + } + if headers: + # Strip Accept-Encoding: Apps Script auto-decompresses gzip + # but NOT brotli/zstd — forwarding "br" causes garbled responses. + filt = {k: v for k, v in headers.items() + if k.lower() != "accept-encoding"} + payload["h"] = filt if filt else headers + if body: + payload["b"] = base64.b64encode(body).decode() + ct = headers.get("Content-Type") or headers.get("content-type") + if ct: + payload["ct"] = ct + return payload + + # ── Batch collector ─────────────────────────────────────────── + + async def _batch_submit(self, payload: dict) -> bytes: + """Submit a request to the batch collector. Returns raw HTTP response.""" + # If batching is disabled (old Code.gs), go direct + if not self._batch_enabled: + return await self._relay_with_retry(payload) + + future = asyncio.get_event_loop().create_future() + + async with self._batch_lock: + self._batch_pending.append((payload, future)) + + if len(self._batch_pending) >= self._batch_max: + # Batch is full — flush now + batch = self._batch_pending[:] + self._batch_pending.clear() + if self._batch_task and not self._batch_task.done(): + self._batch_task.cancel() + self._batch_task = None + asyncio.create_task(self._batch_send(batch)) + elif self._batch_task is None or self._batch_task.done(): + # First request in a new batch window — start timer + self._batch_task = asyncio.create_task(self._batch_timer()) + + return await future + + async def _batch_timer(self): + """Two-tier batch window: 5ms micro + 45ms macro. + + Single requests (link clicks) get only 5ms delay. + Burst traffic (page sub-resources, range chunks) gets a 50ms + window to accumulate, enabling much larger batches. + """ + # Tier 1: micro-window — detect if burst or single + await asyncio.sleep(self._batch_window_micro) + async with self._batch_lock: + if len(self._batch_pending) <= 1: + # Single request — send immediately (only 5ms delay) + if self._batch_pending: + batch = self._batch_pending[:] + self._batch_pending.clear() + self._batch_task = None + asyncio.create_task(self._batch_send(batch)) + return + + # Tier 2: burst detected — wait more to accumulate + await asyncio.sleep(self._batch_window_macro - self._batch_window_micro) + async with self._batch_lock: + if self._batch_pending: + batch = self._batch_pending[:] + self._batch_pending.clear() + self._batch_task = None + asyncio.create_task(self._batch_send(batch)) + + async def _batch_send(self, batch: list): + """Send a batch of requests. Uses fetchAll for multi, single for one.""" + if len(batch) == 1: + payload, future = batch[0] + try: + result = await self._relay_with_retry(payload) + if not future.done(): + future.set_result(result) + except Exception as e: + if not future.done(): + future.set_result(self._error_response(502, str(e))) + else: + log.info("Batch relay: %d requests", len(batch)) + try: + results = await self._relay_batch([p for p, _ in batch]) + for (_, future), result in zip(batch, results): + if not future.done(): + future.set_result(result) + except Exception as e: + log.warning("Batch relay failed, disabling batch mode. " + "Redeploy Code.gs for batch support. Error: %s", e) + self._batch_enabled = False + # Fallback: send individually + tasks = [] + for payload, future in batch: + tasks.append(self._relay_fallback(payload, future)) + await asyncio.gather(*tasks) + + async def _relay_fallback(self, payload, future): + """Fallback: relay a single request from a failed batch.""" + try: + result = await self._relay_with_retry(payload) + if not future.done(): + future.set_result(result) + except Exception as e: + if not future.done(): + future.set_result(self._error_response(502, str(e))) + + # ── Core relay with retry ───────────────────────────────────── + + async def _relay_with_retry(self, payload: dict) -> bytes: + """Single relay with one retry on failure. Uses H2 if available.""" + # Try HTTP/2 first — much faster (multiplexed, no pool checkout) + if self._h2 and self._h2.is_connected: + for attempt in range(2): + try: + return await asyncio.wait_for( + self._relay_single_h2(payload), timeout=25 + ) + except Exception as e: + if attempt == 0: + log.debug("H2 relay failed (%s), reconnecting", e) + try: + await self._h2.reconnect() + except Exception: + log.warning("H2 reconnect failed, falling back to H1") + break + else: + raise + + # HTTP/1.1 fallback (pool-based) + async with self._semaphore: + for attempt in range(2): + try: + return await asyncio.wait_for( + self._relay_single(payload), timeout=25 + ) + except Exception as e: + if attempt == 0: + log.debug("Relay attempt 1 failed (%s: %s), retrying", + type(e).__name__, e) + await self._flush_pool() + else: + raise + + async def _relay_single_h2(self, payload: dict) -> bytes: + """Execute a relay through HTTP/2 multiplexing. + + Uses the shared H2 connection — no pool checkout needed. + Many concurrent calls all share one TLS connection. + """ + full_payload = dict(payload) + full_payload["k"] = self.auth_key + json_body = json.dumps(full_payload).encode() + + path = self._exec_path() + + status, headers, body = await self._h2.request( + method="POST", path=path, host=self.http_host, + headers={"content-type": "application/json"}, + body=json_body, + ) + + return self._parse_relay_response(body) + + async def _relay_single(self, payload: dict) -> bytes: + """Execute a single relay POST → redirect → parse.""" + # Add auth key + full_payload = dict(payload) + full_payload["k"] = self.auth_key + json_body = json.dumps(full_payload).encode() + + path = self._exec_path() + reader, writer, created = await self._acquire() + + try: + request = ( + f"POST {path} HTTP/1.1\r\n" + f"Host: {self.http_host}\r\n" + f"Content-Type: application/json\r\n" + f"Content-Length: {len(json_body)}\r\n" + f"Accept-Encoding: gzip\r\n" + f"Connection: keep-alive\r\n" + f"\r\n" + ) + writer.write(request.encode() + json_body) + await writer.drain() + + status, resp_headers, resp_body = await self._read_http_response(reader) + + # Follow redirect chain on the SAME connection + for _ in range(5): + if status not in (301, 302, 303, 307, 308): + break + location = resp_headers.get("location") + if not location: + break + + parsed = urlparse(location) + rpath = parsed.path + ("?" + parsed.query if parsed.query else "") + request = ( + f"GET {rpath} HTTP/1.1\r\n" + f"Host: {parsed.netloc}\r\n" + f"Accept-Encoding: gzip\r\n" + f"Connection: keep-alive\r\n" + f"\r\n" + ) + writer.write(request.encode()) + await writer.drain() + status, resp_headers, resp_body = await self._read_http_response(reader) + + await self._release(reader, writer, created) + return self._parse_relay_response(resp_body) + + except Exception: + try: + writer.close() + except Exception: + pass + raise + + async def _relay_batch(self, payloads: list[dict]) -> list[bytes]: + """Send multiple requests in one POST using Apps Script fetchAll.""" + batch_payload = { + "k": self.auth_key, + "q": payloads, + } + json_body = json.dumps(batch_payload).encode() + path = self._exec_path() + + # Try HTTP/2 first + if self._h2 and self._h2.is_connected: + try: + status, headers, body = await asyncio.wait_for( + self._h2.request( + method="POST", path=path, host=self.http_host, + headers={"content-type": "application/json"}, + body=json_body, + ), + timeout=30, + ) + return self._parse_batch_body(body, payloads) + except Exception as e: + log.debug("H2 batch failed (%s), falling back to H1", e) + + # HTTP/1.1 fallback + async with self._semaphore: + reader, writer, created = await self._acquire() + try: + request = ( + f"POST {path} HTTP/1.1\r\n" + f"Host: {self.http_host}\r\n" + f"Content-Type: application/json\r\n" + f"Content-Length: {len(json_body)}\r\n" + f"Accept-Encoding: gzip\r\n" + f"Connection: keep-alive\r\n" + f"\r\n" + ) + writer.write(request.encode() + json_body) + await writer.drain() + + status, resp_headers, resp_body = await self._read_http_response(reader) + + # Follow redirects + for _ in range(5): + if status not in (301, 302, 303, 307, 308): + break + location = resp_headers.get("location") + if not location: + break + parsed = urlparse(location) + rpath = parsed.path + ("?" + parsed.query if parsed.query else "") + request = ( + f"GET {rpath} HTTP/1.1\r\n" + f"Host: {parsed.netloc}\r\n" + f"Accept-Encoding: gzip\r\n" + f"Connection: keep-alive\r\n" + f"\r\n" + ) + writer.write(request.encode()) + await writer.drain() + status, resp_headers, resp_body = await self._read_http_response(reader) + + await self._release(reader, writer, created) + + except Exception: + try: + writer.close() + except Exception: + pass + raise + + return self._parse_batch_body(resp_body, payloads) + + def _parse_batch_body(self, resp_body: bytes, + payloads: list[dict]) -> list[bytes]: + """Parse a batch response body into individual results.""" + text = resp_body.decode(errors="replace").strip() + try: + data = json.loads(text) + except json.JSONDecodeError: + m = re.search(r'\{.*\}', text, re.DOTALL) + data = json.loads(m.group()) if m else None + if not data: + raise RuntimeError(f"Bad batch response: {text[:200]}") + + if "e" in data: + raise RuntimeError(f"Batch error: {data['e']}") + + items = data.get("q", []) + if len(items) != len(payloads): + raise RuntimeError( + f"Batch size mismatch: {len(items)} vs {len(payloads)}" + ) + + results = [] + for item in items: + results.append(self._parse_relay_json(item)) + return results + + # ── HTTP response reading (keep-alive safe) ────────────────── + + async def _read_http_response(self, reader: asyncio.StreamReader): + """Read one HTTP response. Keep-alive safe (no read-until-EOF).""" + raw = b"" + while b"\r\n\r\n" not in raw: + chunk = await asyncio.wait_for(reader.read(8192), timeout=8) + if not chunk: + break + raw += chunk + + if b"\r\n\r\n" not in raw: + return 0, {}, b"" + + header_section, body = raw.split(b"\r\n\r\n", 1) + lines = header_section.split(b"\r\n") + + status_line = lines[0].decode(errors="replace") + m = re.search(r"\d{3}", status_line) + status = int(m.group()) if m else 0 + + headers = {} + for line in lines[1:]: + if b":" in line: + k, v = line.decode(errors="replace").split(":", 1) + headers[k.strip().lower()] = v.strip() + + content_length = headers.get("content-length") + transfer_encoding = headers.get("transfer-encoding", "") + + if "chunked" in transfer_encoding: + body = await self._read_chunked(reader, body) + elif content_length: + remaining = int(content_length) - len(body) + while remaining > 0: + chunk = await asyncio.wait_for( + reader.read(min(remaining, 65536)), timeout=20 + ) + if not chunk: + break + body += chunk + remaining -= len(chunk) + else: + # No framing — short timeout read (keep-alive safe) + while True: + try: + chunk = await asyncio.wait_for(reader.read(65536), timeout=2) + if not chunk: + break + body += chunk + except asyncio.TimeoutError: + break + + # Auto-decompress gzip from Google frontend + if headers.get("content-encoding", "").lower() == "gzip": + try: + body = gzip.decompress(body) + except Exception: + pass # not actually gzip, use as-is + + return status, headers, body + + async def _read_chunked(self, reader, buf=b""): + """Incrementally read chunked transfer-encoding.""" + result = b"" + while True: + while b"\r\n" not in buf: + data = await asyncio.wait_for(reader.read(8192), timeout=20) + if not data: + return result + buf += data + + end = buf.find(b"\r\n") + size_str = buf[:end].decode(errors="replace").strip() + buf = buf[end + 2:] + + if not size_str: + continue + try: + size = int(size_str, 16) + except ValueError: + break + if size == 0: + break + + while len(buf) < size + 2: + data = await asyncio.wait_for(reader.read(65536), timeout=20) + if not data: + result += buf[:size] + return result + buf += data + + result += buf[:size] + buf = buf[size + 2:] + + return result + + # ── Response parsing ────────────────────────────────────────── + + def _parse_relay_response(self, body: bytes) -> bytes: + """Parse JSON from Apps Script and reconstruct an HTTP response.""" + text = body.decode(errors="replace").strip() + if not text: + return self._error_response(502, "Empty response from relay") + + try: + data = json.loads(text) + except json.JSONDecodeError: + m = re.search(r'\{.*\}', text, re.DOTALL) + if m: + try: + data = json.loads(m.group()) + except json.JSONDecodeError: + return self._error_response(502, f"Bad JSON: {text[:200]}") + else: + return self._error_response(502, f"No JSON: {text[:200]}") + + return self._parse_relay_json(data) + + def _parse_relay_json(self, data: dict) -> bytes: + """Convert a parsed relay JSON dict to raw HTTP response bytes.""" + if "e" in data: + return self._error_response(502, f"Relay error: {data['e']}") + + status = data.get("s", 200) + resp_headers = data.get("h", {}) + resp_body = base64.b64decode(data.get("b", "")) + + status_text = {200: "OK", 206: "Partial Content", + 301: "Moved", 302: "Found", 304: "Not Modified", + 400: "Bad Request", 403: "Forbidden", 404: "Not Found", + 500: "Internal Server Error"}.get(status, "OK") + result = f"HTTP/1.1 {status} {status_text}\r\n" + + skip = {"transfer-encoding", "connection", "keep-alive", + "content-length", "content-encoding"} + for k, v in resp_headers.items(): + if k.lower() not in skip: + result += f"{k}: {v}\r\n" + result += f"Content-Length: {len(resp_body)}\r\n" + result += "\r\n" + return result.encode() + resp_body + + def _split_raw_response(self, raw: bytes): + """Split a raw HTTP response into (status, headers_dict, body).""" + if b"\r\n\r\n" not in raw: + return 0, {}, raw + header_section, body = raw.split(b"\r\n\r\n", 1) + lines = header_section.split(b"\r\n") + m = re.search(r"\d{3}", lines[0].decode(errors="replace")) + status = int(m.group()) if m else 0 + headers = {} + for line in lines[1:]: + if b":" in line: + k, v = line.decode(errors="replace").split(":", 1) + headers[k.strip().lower()] = v.strip() + return status, headers, body + + def _error_response(self, status: int, message: str) -> bytes: + body = f"

{status}

{message}

" + return ( + f"HTTP/1.1 {status} Error\r\n" + f"Content-Type: text/html\r\n" + f"Content-Length: {len(body)}\r\n" + f"\r\n" + f"{body}" + ).encode() diff --git a/h2_transport.py b/h2_transport.py new file mode 100644 index 0000000..3557840 --- /dev/null +++ b/h2_transport.py @@ -0,0 +1,419 @@ +""" +HTTP/2 multiplexed transport for domain-fronted connections. + +One TLS connection → many concurrent HTTP/2 streams → massive throughput. +Eliminates per-request TLS handshake overhead entirely. + +Instead of a pool of 30 HTTP/1.1 connections (each handling 1 request), +this uses a SINGLE HTTP/2 connection handling 100+ concurrent requests. + +Performance comparison: + HTTP/1.1 pool: 30 connections × 1 request = 30 concurrent requests max + HTTP/2 mux: 1 connection × 100 streams = 100 concurrent requests + +Requires: pip install h2 +""" + +import asyncio +import gzip +import logging +import socket +import ssl +from urllib.parse import urlparse + +log = logging.getLogger("H2") + +try: + import h2.connection + import h2.config + import h2.events + import h2.settings + H2_AVAILABLE = True +except ImportError: + H2_AVAILABLE = False + + +class _StreamState: + """State for a single in-flight HTTP/2 stream.""" + __slots__ = ("status", "headers", "data", "done", "error") + + def __init__(self): + self.status = 0 + self.headers: dict[str, str] = {} + self.data = bytearray() + self.done = asyncio.Event() + self.error: str | None = None + + +class H2Transport: + """ + Persistent HTTP/2 connection with automatic stream multiplexing. + + All relay requests share ONE TLS connection. Each request becomes + an independent HTTP/2 stream, running fully concurrently. + + Features: + - Auto-connect on first use + - Auto-reconnect on connection loss + - Redirect following (as new streams, same connection) + - Gzip decompression + - Configurable max concurrency + """ + + def __init__(self, connect_host: str, sni_host: str, + verify_ssl: bool = True): + self.connect_host = connect_host + self.sni_host = sni_host + self.verify_ssl = verify_ssl + + self._reader: asyncio.StreamReader | None = None + self._writer: asyncio.StreamWriter | None = None + self._h2: "h2.connection.H2Connection | None" = None + self._connected = False + + self._write_lock = asyncio.Lock() + self._connect_lock = asyncio.Lock() + self._read_task: asyncio.Task | None = None + + # Per-stream tracking + self._streams: dict[int, _StreamState] = {} + + # Stats + self.total_requests = 0 + self.total_streams = 0 + + # ── Connection lifecycle ────────────────────────────────────── + + @property + def is_connected(self) -> bool: + return self._connected + + async def ensure_connected(self): + """Connect if not already connected.""" + if self._connected: + return + async with self._connect_lock: + if self._connected: + return + await self._do_connect() + + async def _do_connect(self): + """Establish the HTTP/2 connection with optimized socket settings.""" + ctx = ssl.create_default_context() + # Advertise both h2 and http/1.1 — some DPI blocks h2-only ALPN + ctx.set_alpn_protocols(["h2", "http/1.1"]) + if not self.verify_ssl: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + # Create raw TCP socket with TCP_NODELAY BEFORE TLS handshake. + # Nagle's algorithm can delay small writes (H2 frames) by up to 200ms + # waiting to coalesce — TCP_NODELAY forces immediate send. + raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + raw.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + raw.setblocking(False) + + try: + await asyncio.wait_for( + asyncio.get_event_loop().sock_connect( + raw, (self.connect_host, 443) + ), + timeout=15, + ) + self._reader, self._writer = await asyncio.wait_for( + asyncio.open_connection( + ssl=ctx, + server_hostname=self.sni_host, + sock=raw, + ), + timeout=15, + ) + except Exception: + raw.close() + raise + + # Verify we actually got HTTP/2 + ssl_obj = self._writer.get_extra_info("ssl_object") + negotiated = ssl_obj.selected_alpn_protocol() if ssl_obj else None + if negotiated != "h2": + self._writer.close() + raise RuntimeError( + f"H2 ALPN negotiation failed (got {negotiated!r})" + ) + + config = h2.config.H2Configuration( + client_side=True, + header_encoding="utf-8", + ) + self._h2 = h2.connection.H2Connection(config=config) + self._h2.initiate_connection() + + # Connection-level flow control: ~16MB window + self._h2.increment_flow_control_window(2 ** 24 - 65535) + + # Per-stream settings: 1MB initial window, disable server push + self._h2.update_settings({ + h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 1 * 1024 * 1024, + h2.settings.SettingCodes.ENABLE_PUSH: 0, + }) + + await self._flush() + + self._connected = True + self._read_task = asyncio.create_task(self._reader_loop()) + log.info("H2 connected → %s (SNI=%s, TCP_NODELAY=on)", + self.connect_host, self.sni_host) + + async def reconnect(self): + """Close current connection and re-establish.""" + await self._close_internal() + await self._do_connect() + + async def _close_internal(self): + self._connected = False + if self._read_task: + self._read_task.cancel() + self._read_task = None + if self._writer: + try: + self._writer.close() + except Exception: + pass + self._writer = None + # Wake all pending streams so they can raise + for state in self._streams.values(): + state.error = "Connection closed" + state.done.set() + self._streams.clear() + + # ── Public API ──────────────────────────────────────────────── + + async def request(self, method: str, path: str, host: str, + headers: dict | None = None, + body: bytes | None = None, + timeout: float = 25, + follow_redirects: int = 5) -> tuple[int, dict, bytes]: + """ + Send an HTTP/2 request and return (status, headers, body). + + Thread-safe: many concurrent calls each get their own stream. + Redirects are followed as new streams on the same connection. + """ + await self.ensure_connected() + self.total_requests += 1 + + for _ in range(follow_redirects + 1): + status, resp_headers, resp_body = await self._single_request( + method, path, host, headers, body, timeout, + ) + + if status not in (301, 302, 303, 307, 308): + return status, resp_headers, resp_body + + location = resp_headers.get("location", "") + if not location: + return status, resp_headers, resp_body + + parsed = urlparse(location) + path = parsed.path + ("?" + parsed.query if parsed.query else "") + host = parsed.netloc or host + method = "GET" + body = None + headers = None # Drop request headers on redirect + + return status, resp_headers, resp_body + + # ── Stream handling ─────────────────────────────────────────── + + async def _single_request(self, method, path, host, headers, body, + timeout) -> tuple[int, dict, bytes]: + """Send one HTTP/2 request on a new stream, wait for response.""" + if not self._connected: + await self.ensure_connected() + + stream_id = None + + async with self._write_lock: + try: + stream_id = self._h2.get_next_available_stream_id() + except Exception: + # Connection is stale — reconnect + await self.reconnect() + stream_id = self._h2.get_next_available_stream_id() + + h2_headers = [ + (":method", method), + (":path", path), + (":authority", host), + (":scheme", "https"), + ("accept-encoding", "gzip"), + ] + if headers: + for k, v in headers.items(): + h2_headers.append((k.lower(), str(v))) + + end_stream = not body + self._h2.send_headers(stream_id, h2_headers, end_stream=end_stream) + + if body: + # Send body (may need chunking for flow control) + self._send_body(stream_id, body) + + state = _StreamState() + self._streams[stream_id] = state + self.total_streams += 1 + + await self._flush() + + # Wait for complete response + try: + await asyncio.wait_for(state.done.wait(), timeout=timeout) + except asyncio.TimeoutError: + self._streams.pop(stream_id, None) + raise TimeoutError( + f"H2 stream {stream_id} timed out ({timeout}s)" + ) + + self._streams.pop(stream_id, None) + + if state.error: + raise ConnectionError(f"H2 stream error: {state.error}") + + # Auto-decompress gzip + resp_body = bytes(state.data) + if state.headers.get("content-encoding", "").lower() == "gzip": + try: + resp_body = gzip.decompress(resp_body) + except Exception: + pass + + return state.status, state.headers, resp_body + + def _send_body(self, stream_id: int, body: bytes): + """Send request body, respecting H2 flow control window.""" + # For small bodies (typical JSON payloads), send in one shot + while body: + max_size = self._h2.local_settings.max_frame_size + window = self._h2.local_flow_control_window(stream_id) + send_size = min(len(body), max_size, window) + if send_size <= 0: + # Flow control full — let the reader loop process + # window updates before we continue + break + end = send_size >= len(body) + self._h2.send_data(stream_id, body[:send_size], end_stream=end) + body = body[send_size:] + + # ── Background reader ───────────────────────────────────────── + + async def _reader_loop(self): + """Background: read H2 frames, dispatch events to waiting streams.""" + try: + while self._connected: + data = await self._reader.read(65536) + if not data: + log.warning("H2 remote closed connection") + break + + try: + events = self._h2.receive_data(data) + except Exception as e: + log.error("H2 protocol error: %s", e) + break + + for event in events: + self._dispatch(event) + + # Send pending data (acks, window updates, ping responses) + async with self._write_lock: + await self._flush() + + except asyncio.CancelledError: + pass + except Exception as e: + log.error("H2 reader error: %s", e) + finally: + self._connected = False + for state in self._streams.values(): + if not state.done.is_set(): + state.error = "Connection lost" + state.done.set() + log.info("H2 reader loop ended") + + def _dispatch(self, event): + """Route a single h2 event to its stream.""" + if isinstance(event, h2.events.ResponseReceived): + state = self._streams.get(event.stream_id) + if state: + for name, value in event.headers: + n = name if isinstance(name, str) else name.decode() + v = value if isinstance(value, str) else value.decode() + if n == ":status": + state.status = int(v) + else: + state.headers[n] = v + + elif isinstance(event, h2.events.DataReceived): + state = self._streams.get(event.stream_id) + if state: + state.data.extend(event.data) + # Always acknowledge received data for flow control + self._h2.acknowledge_received_data( + event.flow_controlled_length, event.stream_id + ) + + elif isinstance(event, h2.events.StreamEnded): + state = self._streams.get(event.stream_id) + if state: + state.done.set() + + elif isinstance(event, h2.events.StreamReset): + state = self._streams.get(event.stream_id) + if state: + state.error = f"Stream reset (code={event.error_code})" + state.done.set() + + elif isinstance(event, h2.events.WindowUpdated): + pass # h2 library handles window bookkeeping + + elif isinstance(event, h2.events.SettingsAcknowledged): + pass + + elif isinstance(event, h2.events.PingReceived): + pass # h2 library auto-responds + + elif isinstance(event, h2.events.PingAckReceived): + pass # keepalive confirmed + + # ── Internal ────────────────────────────────────────────────── + + async def _flush(self): + """Write pending H2 frame data to the socket.""" + data = self._h2.data_to_send() + if data and self._writer: + self._writer.write(data) + await self._writer.drain() + + async def close(self): + """Gracefully close the HTTP/2 connection.""" + if self._h2 and self._connected: + try: + self._h2.close_connection() + async with self._write_lock: + await self._flush() + except Exception: + pass + await self._close_internal() + + async def ping(self): + """Send an H2 PING frame to keep the connection alive.""" + if not self._connected or not self._h2: + return + try: + async with self._write_lock: + if not self._connected: + return + self._h2.ping(b"\x00" * 8) + await self._flush() + except Exception as e: + log.debug("H2 PING failed: %s", e) diff --git a/main.py b/main.py new file mode 100644 index 0000000..99b36f0 --- /dev/null +++ b/main.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +""" +DomainFront Tunnel — Bypass DPI censorship via Domain Fronting. + +Run a local HTTP proxy that tunnels all traffic through a CDN using +domain fronting: the TLS SNI shows an allowed domain while the encrypted +HTTP Host header routes to your Cloudflare Worker relay. +""" + +import argparse +import asyncio +import json +import logging +import os +import sys + +from proxy_server import ProxyServer + +__version__ = "1.0.0" + + +def setup_logging(level_name: str): + level = getattr(logging, level_name.upper(), logging.INFO) + logging.basicConfig( + level=level, + format="%(asctime)s [%(name)-12s] %(levelname)-7s %(message)s", + datefmt="%H:%M:%S", + ) + + +def parse_args(): + parser = argparse.ArgumentParser( + prog="domainfront-tunnel", + description="Local HTTP proxy that tunnels traffic through domain fronting.", + ) + parser.add_argument( + "-c", "--config", + default=os.environ.get("DFT_CONFIG", "config.json"), + help="Path to config file (default: config.json, env: DFT_CONFIG)", + ) + parser.add_argument( + "-p", "--port", + type=int, + default=None, + help="Override listen port (env: DFT_PORT)", + ) + parser.add_argument( + "--host", + default=None, + help="Override listen host (env: DFT_HOST)", + ) + parser.add_argument( + "--log-level", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + default=None, + help="Override log level (env: DFT_LOG_LEVEL)", + ) + parser.add_argument( + "-v", "--version", + action="version", + version=f"%(prog)s {__version__}", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + config_path = args.config + + try: + with open(config_path) as f: + config = json.load(f) + except FileNotFoundError: + print(f"Config not found: {config_path}") + print("Copy config.example.json to config.json and fill in your values.") + sys.exit(1) + except json.JSONDecodeError as e: + print(f"Invalid JSON in config: {e}") + sys.exit(1) + + # Environment variable overrides + if os.environ.get("DFT_AUTH_KEY"): + config["auth_key"] = os.environ["DFT_AUTH_KEY"] + if os.environ.get("DFT_SCRIPT_ID"): + config["script_id"] = os.environ["DFT_SCRIPT_ID"] + + # CLI argument overrides + if args.port is not None: + config["listen_port"] = args.port + elif os.environ.get("DFT_PORT"): + config["listen_port"] = int(os.environ["DFT_PORT"]) + + if args.host is not None: + config["listen_host"] = args.host + elif os.environ.get("DFT_HOST"): + config["listen_host"] = os.environ["DFT_HOST"] + + if args.log_level is not None: + config["log_level"] = args.log_level + elif os.environ.get("DFT_LOG_LEVEL"): + config["log_level"] = os.environ["DFT_LOG_LEVEL"] + + for key in ("auth_key",): + if key not in config: + print(f"Missing required config key: {key}") + sys.exit(1) + + mode = config.get("mode", "domain_fronting") + if mode == "custom_domain" and "custom_domain" not in config: + print("Mode 'custom_domain' requires 'custom_domain' in config") + sys.exit(1) + if mode == "domain_fronting": + for key in ("front_domain", "worker_host"): + if key not in config: + print(f"Mode 'domain_fronting' requires '{key}' in config") + sys.exit(1) + if mode == "google_fronting": + if "worker_host" not in config: + print("Mode 'google_fronting' requires 'worker_host' in config (your Cloud Run URL)") + sys.exit(1) + if mode == "apps_script": + sid = config.get("script_ids") or config.get("script_id") + if not sid or (isinstance(sid, str) and sid == "YOUR_APPS_SCRIPT_DEPLOYMENT_ID"): + print("Mode 'apps_script' requires 'script_id' in config.") + print("Deploy the Apps Script from appsscript/Code.gs and paste the Deployment ID.") + sys.exit(1) + + setup_logging(config.get("log_level", "INFO")) + log = logging.getLogger("Main") + + mode = config.get("mode", "domain_fronting") + log.info("DomainFront Tunnel starting (mode: %s)", mode) + + if mode == "custom_domain": + log.info("Custom domain : %s", config["custom_domain"]) + elif mode == "google_fronting": + log.info("Google fronting : SNI=%s → Host=%s", + config.get("front_domain", "www.google.com"), config["worker_host"]) + log.info("Google IP : %s", config.get("google_ip", "216.239.38.120")) + elif mode == "apps_script": + log.info("Apps Script relay : SNI=%s → script.google.com", + config.get("front_domain", "www.google.com")) + script_ids = config.get("script_ids") or config.get("script_id") + if isinstance(script_ids, list): + log.info("Script IDs : %d scripts (round-robin)", len(script_ids)) + for i, sid in enumerate(script_ids): + log.info(" [%d] %s", i + 1, sid) + else: + log.info("Script ID : %s", script_ids) + log.info("MITM enabled — install ca/ca.crt in your browser!") + else: + log.info("Front domain (SNI) : %s", config.get("front_domain", "?")) + log.info("Worker host (Host) : %s", config.get("worker_host", "?")) + + log.info("Proxy address : %s:%d", config.get("listen_host", "127.0.0.1"), config.get("listen_port", 8080)) + + try: + asyncio.run(ProxyServer(config).start()) + except KeyboardInterrupt: + log.info("Stopped") + + +if __name__ == "__main__": + main() diff --git a/mitm.py b/mitm.py new file mode 100644 index 0000000..36fc49d --- /dev/null +++ b/mitm.py @@ -0,0 +1,153 @@ +""" +MITM certificate manager for HTTPS interception. + +Generates a CA certificate (once, stored as files) and per-domain +certificates (on the fly, cached in memory) so the local proxy can +decrypt HTTPS traffic and relay it through Apps Script. + +The user must install ca/ca.crt in their browser's trusted CAs once. + +Requires: pip install cryptography +""" + +import datetime +import logging +import os +import ssl +import tempfile + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID + +log = logging.getLogger("MITM") + +CA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ca") +CA_KEY_FILE = os.path.join(CA_DIR, "ca.key") +CA_CERT_FILE = os.path.join(CA_DIR, "ca.crt") + + +class MITMCertManager: + def __init__(self): + self._ca_key = None + self._ca_cert = None + self._ctx_cache: dict[str, ssl.SSLContext] = {} + self._cert_dir = tempfile.mkdtemp(prefix="domainfront_certs_") + self._ensure_ca() + + def _ensure_ca(self): + if os.path.exists(CA_KEY_FILE) and os.path.exists(CA_CERT_FILE): + with open(CA_KEY_FILE, "rb") as f: + self._ca_key = serialization.load_pem_private_key( + f.read(), password=None + ) + with open(CA_CERT_FILE, "rb") as f: + self._ca_cert = x509.load_pem_x509_certificate(f.read()) + log.info("Loaded CA from %s", CA_DIR) + else: + self._create_ca() + + def _create_ca(self): + os.makedirs(CA_DIR, exist_ok=True) + + self._ca_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048 + ) + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, "DomainFront Tunnel CA"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "DomainFront Tunnel"), + ]) + now = datetime.datetime.now(datetime.timezone.utc) + self._ca_cert = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(self._ca_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(now) + .not_valid_after(now + datetime.timedelta(days=3650)) + .add_extension( + x509.BasicConstraints(ca=True, path_length=0), critical=True + ) + .add_extension( + x509.KeyUsage( + digital_signature=True, + key_cert_sign=True, + crl_sign=True, + content_commitment=False, + key_encipherment=False, + data_encipherment=False, + key_agreement=False, + encipher_only=False, + decipher_only=False, + ), + critical=True, + ) + .sign(self._ca_key, hashes.SHA256()) + ) + + with open(CA_KEY_FILE, "wb") as f: + f.write( + self._ca_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption(), + ) + ) + with open(CA_CERT_FILE, "wb") as f: + f.write(self._ca_cert.public_bytes(serialization.Encoding.PEM)) + + log.warning("Generated new CA certificate: %s", CA_CERT_FILE) + log.warning(">>> Install this file in your browser's Trusted Root CAs! <<<") + + def get_server_context(self, domain: str) -> ssl.SSLContext: + if domain not in self._ctx_cache: + key_pem, cert_pem = self._generate_domain_cert(domain) + + cert_file = os.path.join(self._cert_dir, f"{domain}.crt") + key_file = os.path.join(self._cert_dir, f"{domain}.key") + + ca_pem = self._ca_cert.public_bytes(serialization.Encoding.PEM) + with open(cert_file, "wb") as f: + f.write(cert_pem + ca_pem) + with open(key_file, "wb") as f: + f.write(key_pem) + + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ctx.set_alpn_protocols(["http/1.1"]) + ctx.load_cert_chain(cert_file, key_file) + self._ctx_cache[domain] = ctx + + return self._ctx_cache[domain] + + def _generate_domain_cert(self, domain: str): + key = rsa.generate_private_key( + public_exponent=65537, key_size=2048 + ) + subject = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, domain), + ]) + now = datetime.datetime.now(datetime.timezone.utc) + cert = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(self._ca_cert.subject) + .public_key(key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(now) + .not_valid_after(now + datetime.timedelta(days=365)) + .add_extension( + x509.SubjectAlternativeName([x509.DNSName(domain)]), + critical=False, + ) + .sign(self._ca_key, hashes.SHA256()) + ) + + key_pem = key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption(), + ) + cert_pem = cert.public_bytes(serialization.Encoding.PEM) + return key_pem, cert_pem diff --git a/proxy_server.py b/proxy_server.py new file mode 100644 index 0000000..5ade0ab --- /dev/null +++ b/proxy_server.py @@ -0,0 +1,520 @@ +""" +Local HTTP proxy server. + +Intercepts the user's browser traffic and forwards everything through +a domain-fronted connection to a CDN worker or Apps Script relay. + +Supports: + - CONNECT method → WebSocket tunnel (modes 1-3) or MITM relay (apps_script) + - GET / POST etc. → HTTP forwarding (modes 1-3) or JSON relay (apps_script) +""" + +import asyncio +import logging +import re +import time + +from domain_fronter import DomainFronter + +log = logging.getLogger("Proxy") + + +class ResponseCache: + """Simple LRU response cache — avoids repeated relay calls.""" + + def __init__(self, max_mb: int = 50): + self._store: dict[str, tuple[bytes, float]] = {} + self._size = 0 + self._max = max_mb * 1024 * 1024 + self.hits = 0 + self.misses = 0 + + def get(self, url: str) -> bytes | None: + entry = self._store.get(url) + if not entry: + self.misses += 1 + return None + raw, expires = entry + if time.time() > expires: + self._size -= len(raw) + del self._store[url] + self.misses += 1 + return None + self.hits += 1 + return raw + + def put(self, url: str, raw_response: bytes, ttl: int = 300): + size = len(raw_response) + if size > self._max // 4 or size == 0: + return + # Evict oldest to make room + while self._size + size > self._max and self._store: + oldest = next(iter(self._store)) + self._size -= len(self._store[oldest][0]) + del self._store[oldest] + if url in self._store: + self._size -= len(self._store[url][0]) + self._store[url] = (raw_response, time.time() + ttl) + self._size += size + + @staticmethod + def parse_ttl(raw_response: bytes, url: str) -> int: + """Determine cache TTL from response headers and URL.""" + hdr_end = raw_response.find(b"\r\n\r\n") + if hdr_end < 0: + return 0 + hdr = raw_response[:hdr_end].decode(errors="replace").lower() + + # Don't cache errors or non-200 + if b"HTTP/1.1 200" not in raw_response[:20]: + return 0 + if "no-store" in hdr: + return 0 + + # Explicit max-age + m = re.search(r"max-age=(\d+)", hdr) + if m: + return min(int(m.group(1)), 86400) + + # Heuristic by content type / extension + path = url.split("?")[0].lower() + static_exts = ( + ".css", ".js", ".woff", ".woff2", ".ttf", ".eot", + ".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".ico", + ".mp3", ".mp4", ".wasm", + ) + for ext in static_exts: + if path.endswith(ext): + return 3600 # 1 hour for static assets + + ct_m = re.search(r"content-type:\s*([^\r\n]+)", hdr) + ct = ct_m.group(1) if ct_m else "" + if "image/" in ct or "font/" in ct: + return 3600 + if "text/css" in ct or "javascript" in ct: + return 1800 + if "text/html" in ct or "application/json" in ct: + return 0 # don't cache dynamic content by default + + return 0 + + +class ProxyServer: + def __init__(self, config: dict): + self.host = config.get("listen_host", "127.0.0.1") + self.port = config.get("listen_port", 8080) + self.mode = config.get("mode", "domain_fronting") + self.fronter = DomainFronter(config) + self.mitm = None + self._cache = ResponseCache(max_mb=50) + + # Persistent HTTP tunnel cache for google_fronting mode + # Key: "host:port" → (tunnel_reader, tunnel_writer, lock) + self._http_tunnels: dict = {} + self._tunnel_lock = asyncio.Lock() + + if self.mode == "apps_script": + try: + from mitm import MITMCertManager + self.mitm = MITMCertManager() + except ImportError: + log.error("apps_script mode requires 'cryptography' package.") + log.error("Run: pip install cryptography") + raise SystemExit(1) + + async def start(self): + srv = await asyncio.start_server(self._on_client, self.host, self.port) + log.info( + "Listening on %s:%d — configure your browser HTTP proxy to this address", + self.host, self.port, + ) + async with srv: + await srv.serve_forever() + + # ── client handler ──────────────────────────────────────────── + + async def _on_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + addr = writer.get_extra_info("peername") + try: + first_line = await asyncio.wait_for(reader.readline(), timeout=30) + if not first_line: + return + + # Read remaining headers + header_block = first_line + while True: + line = await asyncio.wait_for(reader.readline(), timeout=10) + header_block += line + if line in (b"\r\n", b"\n", b""): + break + + request_line = first_line.decode(errors="replace").strip() + parts = request_line.split(" ", 2) + if len(parts) < 2: + return + + method = parts[0].upper() + + if method == "CONNECT": + await self._do_connect(parts[1], reader, writer) + else: + await self._do_http(header_block, reader, writer) + + except asyncio.TimeoutError: + log.debug("Timeout: %s", addr) + except Exception as e: + log.error("Error (%s): %s", addr, e) + finally: + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + + # ── CONNECT (HTTPS tunnelling) ──────────────────────────────── + + async def _do_connect(self, target: str, reader, writer): + host, _, port = target.rpartition(":") + port = int(port) if port else 443 + if not host: + host, port = target, 443 + + log.info("CONNECT → %s:%d", host, port) + + writer.write(b"HTTP/1.1 200 Connection Established\r\n\r\n") + await writer.drain() + + if self.mode == "apps_script": + # Google services: tunnel directly (no MITM) to avoid + # Google's anti-bot detection from Apps Script IPs/UA. + if self._is_google_domain(host): + log.info("Direct tunnel → %s (Google domain, skipping relay)", host) + await self._do_direct_tunnel(host, port, reader, writer) + else: + await self._do_mitm_connect(host, port, reader, writer) + else: + await self.fronter.tunnel(host, port, reader, writer) + + # ── Google domain detection ─────────────────────────────────── + + # Only domains whose SNI the ISP does NOT block. + # YouTube/googlevideo are blocked by SNI inspection in Iran, + # so they MUST go through the MITM relay (domain-fronted). + _GOOGLE_SUFFIXES = ( + ".google.com", ".google.co", + ".googleapis.com", ".gstatic.com", + ".googleusercontent.com", + ) + _GOOGLE_EXACT = { + "google.com", "gstatic.com", "googleapis.com", + } + + def _is_google_domain(self, host: str) -> bool: + """Return True if host is a Google-owned domain.""" + h = host.lower().rstrip(".") + if h in self._GOOGLE_EXACT: + return True + for suffix in self._GOOGLE_SUFFIXES: + if h.endswith(suffix): + return True + return False + + # ── Direct tunnel (no MITM) ─────────────────────────────────── + + async def _do_direct_tunnel(self, host: str, port: int, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter): + """Pipe raw TLS bytes directly to the target server. + + Used for Google domains: the browser's TLS goes end-to-end + with Google, preserving real User-Agent and avoiding + Apps Script IP/bot-detection issues. + """ + google_ip = self.fronter.connect_host + try: + r_remote, w_remote = await asyncio.wait_for( + asyncio.open_connection(google_ip, port), timeout=10 + ) + except Exception as e: + log.error("Direct tunnel connect failed (%s via %s): %s", + host, google_ip, e) + return + + async def pipe(src, dst, label): + try: + while True: + data = await src.read(65536) + if not data: + break + dst.write(data) + await dst.drain() + except (ConnectionError, asyncio.CancelledError): + pass + except Exception as e: + log.debug("Pipe %s ended: %s", label, e) + finally: + try: + dst.close() + except Exception: + pass + + await asyncio.gather( + pipe(reader, w_remote, f"client→{host}"), + pipe(r_remote, writer, f"{host}→client"), + ) + + # ── MITM CONNECT (apps_script mode) ─────────────────────────── + + async def _do_mitm_connect(self, host: str, port: int, reader, writer): + """Intercept TLS, decrypt HTTP, and relay through Apps Script.""" + ssl_ctx = self.mitm.get_server_context(host) + + # Upgrade the existing connection to TLS (we are the server) + loop = asyncio.get_event_loop() + transport = writer.transport + protocol = transport.get_protocol() + + try: + new_transport = await loop.start_tls( + transport, protocol, ssl_ctx, server_side=True, + ) + except Exception as e: + log.error("TLS handshake failed for %s: %s", host, e) + return + + # Update writer to use the new TLS transport + writer._transport = new_transport + + # Read and relay HTTP requests from the browser (now decrypted) + while True: + try: + first_line = await asyncio.wait_for(reader.readline(), timeout=120) + if not first_line: + break + + header_block = first_line + while True: + line = await asyncio.wait_for(reader.readline(), timeout=10) + header_block += line + if line in (b"\r\n", b"\n", b""): + break + + # Read body + body = b"" + for raw_line in header_block.split(b"\r\n"): + if raw_line.lower().startswith(b"content-length:"): + length = int(raw_line.split(b":", 1)[1].strip()) + body = await reader.readexactly(length) + break + + # Parse the request + request_line = first_line.decode(errors="replace").strip() + parts = request_line.split(" ", 2) + if len(parts) < 2: + break + + method = parts[0] + path = parts[1] + + # Parse headers + headers = {} + for raw_line in header_block.split(b"\r\n")[1:]: + if b":" in raw_line: + k, v = raw_line.decode(errors="replace").split(":", 1) + headers[k.strip()] = v.strip() + + # Build full URL (browser sends just the path in CONNECT) + if port == 443: + url = f"https://{host}{path}" + else: + url = f"https://{host}:{port}{path}" + + log.info("MITM → %s %s", method, url) + + # Check local cache first (GET only) + response = None + if method == "GET" and not body: + response = self._cache.get(url) + if response: + log.debug("Cache HIT: %s", url[:60]) + + if response is None: + # Relay through Apps Script + try: + response = await self._relay_smart(method, url, headers, body) + except Exception as e: + log.error("Relay error (%s): %s", url[:60], e) + err_body = f"Relay error: {e}".encode() + response = ( + b"HTTP/1.1 502 Bad Gateway\r\n" + b"Content-Type: text/plain\r\n" + b"Content-Length: " + str(len(err_body)).encode() + b"\r\n" + b"\r\n" + err_body + ) + + # Cache successful GET responses + if method == "GET" and not body and response: + ttl = ResponseCache.parse_ttl(response, url) + if ttl > 0: + self._cache.put(url, response, ttl) + log.debug("Cached (%ds): %s", ttl, url[:60]) + + writer.write(response) + await writer.drain() + + except asyncio.TimeoutError: + break + except asyncio.IncompleteReadError: + break + except ConnectionError: + break + except Exception as e: + log.error("MITM handler error (%s): %s", host, e) + break + + async def _relay_smart(self, method, url, headers, body): + """Choose optimal relay strategy based on request type. + + ALL GET requests go through relay_parallel: it does one probe + request and only splits into parallel chunks if the response + is large and the server supports ranges. Small responses still + use a single request (no overhead). + """ + if method == "GET" and not body: + # Skip parallel-range if the client already sent a Range header + # (we must forward it verbatim, not modify it). + if headers: + for k in headers: + if k.lower() == "range": + return await self.fronter.relay( + method, url, headers, body + ) + return await self.fronter.relay_parallel( + method, url, headers, body + ) + return await self.fronter.relay(method, url, headers, body) + + def _is_likely_download(self, url: str, headers: dict) -> bool: + """Heuristic: is this URL likely a large file download?""" + # Check file extension + path = url.split("?")[0].lower() + large_exts = { + ".zip", ".tar", ".gz", ".bz2", ".xz", ".7z", ".rar", + ".exe", ".msi", ".dmg", ".deb", ".rpm", ".apk", + ".iso", ".img", + ".mp4", ".mkv", ".avi", ".mov", ".webm", + ".mp3", ".flac", ".wav", ".aac", + ".pdf", ".doc", ".docx", ".ppt", ".pptx", + ".wasm", + } + for ext in large_exts: + if path.endswith(ext): + return True + return False + + # ── Plain HTTP forwarding ───────────────────────────────────── + + async def _do_http(self, header_block: bytes, reader, writer): + body = b"" + for raw_line in header_block.split(b"\r\n"): + if raw_line.lower().startswith(b"content-length:"): + length = int(raw_line.split(b":", 1)[1].strip()) + body = await reader.readexactly(length) + break + + first_line = header_block.split(b"\r\n")[0].decode(errors="replace") + log.info("HTTP → %s", first_line) + + if self.mode == "apps_script": + # Parse request and relay through Apps Script + parts = first_line.strip().split(" ", 2) + method = parts[0] if parts else "GET" + url = parts[1] if len(parts) > 1 else "/" + + headers = {} + for raw_line in header_block.split(b"\r\n")[1:]: + if b":" in raw_line: + k, v = raw_line.decode(errors="replace").split(":", 1) + headers[k.strip()] = v.strip() + + # Cache check for GET + response = None + if method == "GET" and not body: + response = self._cache.get(url) + if response: + log.debug("Cache HIT (HTTP): %s", url[:60]) + + if response is None: + response = await self._relay_smart(method, url, headers, body) + # Cache successful GET + if method == "GET" and not body and response: + ttl = ResponseCache.parse_ttl(response, url) + if ttl > 0: + self._cache.put(url, response, ttl) + elif self.mode in ("google_fronting", "custom_domain", "domain_fronting"): + # Use WebSocket tunnel for ALL traffic (much faster than forward()) + response = await self._tunnel_http(header_block, body) + else: + response = await self.fronter.forward(header_block + body) + + writer.write(response) + await writer.drain() + + async def _tunnel_http(self, header_block: bytes, body: bytes) -> bytes: + """Forward plain HTTP via a persistent WebSocket tunnel. + + Instead of opening a new TLS+HTTP connection for each request + (the old forward() path), this keeps a WebSocket tunnel open + to the target host and pipes raw HTTP through it. + Much faster for rapid-fire requests (e.g., Telegram API). + """ + import re as _re + + # Parse target host:port from the raw HTTP request + host = "" + port = 80 + for line in header_block.split(b"\r\n")[1:]: + if not line: + break + if line.lower().startswith(b"host:"): + host_val = line.split(b":", 1)[1].strip().decode(errors="replace") + if ":" in host_val: + h, p = host_val.rsplit(":", 1) + try: + host, port = h, int(p) + except ValueError: + host = host_val + else: + host = host_val + break + + if not host: + return b"HTTP/1.1 400 Bad Request\r\n\r\nNo Host header\r\n" + + # Rewrite the request line: browser sends absolute URL + # (e.g., "GET http://host/path HTTP/1.1") but the target + # server expects a relative path ("GET /path HTTP/1.1") + first_line = header_block.split(b"\r\n")[0] + first_str = first_line.decode(errors="replace") + parts = first_str.split(" ", 2) + if len(parts) >= 2 and parts[1].startswith("http://"): + from urllib.parse import urlparse + parsed = urlparse(parts[1]) + rel_path = parsed.path or "/" + if parsed.query: + rel_path += "?" + parsed.query + new_first = f"{parts[0]} {rel_path}" + if len(parts) == 3: + new_first += f" {parts[2]}" + header_block = new_first.encode() + b"\r\n" + b"\r\n".join(header_block.split(b"\r\n")[1:]) + + raw_request = header_block + body + + # Send through tunnel + try: + return await asyncio.wait_for( + self.fronter.forward(raw_request), timeout=30 + ) + except Exception as e: + log.error("Tunnel HTTP failed (%s:%d): %s", host, port, e) + return b"HTTP/1.1 502 Bad Gateway\r\n\r\nTunnel forward failed\r\n" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5ff2a6a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +# Core (no external dependencies for basic modes) +# Python 3.10+ required + +# Optional: MITM interception (apps_script mode) +cryptography>=41.0.0 + +# Optional: HTTP/2 multiplexing (faster apps_script relay) +h2>=4.1.0 diff --git a/ws.py b/ws.py new file mode 100644 index 0000000..0c3ed22 --- /dev/null +++ b/ws.py @@ -0,0 +1,76 @@ +""" +Minimal WebSocket frame encoder / decoder (RFC 6455). + +Only handles binary (opcode 0x02) and close (opcode 0x08) frames. +Client-to-server frames are always masked as required by the spec. +""" + +import os +import struct + + +def ws_encode(data: bytes, opcode: int = 0x02) -> bytes: + """Encode *data* into a masked binary WebSocket frame.""" + head = bytearray([0x80 | opcode]) # FIN + opcode + + length = len(data) + if length < 126: + head.append(0x80 | length) + elif length < 0x10000: + head.append(0x80 | 126) + head += struct.pack("!H", length) + else: + head.append(0x80 | 127) + head += struct.pack("!Q", length) + + mask = os.urandom(4) + head += mask + + masked = bytearray(data) + for i in range(len(masked)): + masked[i] ^= mask[i & 3] + + return bytes(head) + bytes(masked) + + +def ws_decode(buf: bytes): + """Try to decode one frame from *buf*. + + Returns ``(opcode, payload, consumed_bytes)`` or ``None`` if the + buffer does not yet contain a complete frame. + """ + if len(buf) < 2: + return None + + opcode = buf[0] & 0x0F + is_masked = buf[1] & 0x80 + length = buf[1] & 0x7F + pos = 2 + + if length == 126: + if len(buf) < 4: + return None + length = struct.unpack("!H", buf[2:4])[0] + pos = 4 + elif length == 127: + if len(buf) < 10: + return None + length = struct.unpack("!Q", buf[2:10])[0] + pos = 10 + + mask = None + if is_masked: + if len(buf) < pos + 4: + return None + mask = buf[pos : pos + 4] + pos += 4 + + if len(buf) < pos + length: + return None + + payload = bytearray(buf[pos : pos + length]) + if mask: + for i in range(len(payload)): + payload[i] ^= mask[i & 3] + + return opcode, bytes(payload), pos + length