feat: implement local HTTP proxy server with MITM capabilities

- Added a new proxy server that intercepts browser traffic and forwards requests through an Apps Script relay.
- Implemented response caching to optimize repeated requests.
- Included support for SOCKS5 proxy connections.
- Added handling for CORS preflight requests and response injection.
- Integrated domain-based policies for blocking and bypassing specific hosts.
- Enhanced error handling and logging for better debugging.
This commit is contained in:
Abolfazl
2026-04-22 05:19:59 +03:30
parent c20a9f6fcd
commit acf75dcfd7
14 changed files with 1048 additions and 235 deletions
+37 -11
View File
@@ -69,7 +69,7 @@ This is the "relay" that sits on Google's servers and fetches websites for you.
1. Open [Google Apps Script](https://script.google.com/) and sign in with your Google account.
2. Click **New project**.
3. **Delete** all the default code in the editor.
4. Open the [`Code.gs`](Code.gs) file from this project, **copy everything**, and paste it into the Apps Script editor.
4. Open the [`Code.gs`](apps_script/Code.gs) file from this project (under `apps_script/`), **copy everything**, and paste it into the Apps Script editor.
5. **Important:** Change the password on this line to something only you know:
```javascript
const AUTH_KEY = "your-secret-password-here";
@@ -200,6 +200,21 @@ This project focuses entirely on the **Apps Script** relay — a free Google acc
| `front_domain` | `www.google.com` | Domain shown to the firewall/filter |
| `verify_ssl` | `true` | Verify TLS certificates |
| `script_ids` | — | Multiple Script IDs for load balancing (array) |
| `block_hosts` | `[]` | Hosts that must never be tunneled (return HTTP 403). Supports exact names (`ads.example.com`) or leading-dot suffixes (`.doubleclick.net`). |
| `bypass_hosts` | `["localhost", ".local", ".lan", ".home.arpa"]` | Hosts that go direct (no MITM, no relay). Useful for LAN resources or sites that break under MITM. |
| `direct_google_exclude` | see [config.example.json](config.example.json) | Google apps that must use the MITM relay path instead of the fast direct tunnel. |
| `hosts` | `{}` | Manual DNS override: map a hostname to a specific IP. |
### Optional Dependencies
Install everything from [`requirements.txt`](requirements.txt). All listed packages are optional — the proxy runs with no third-party dependencies in basic modes, but without them you lose features:
| Package | Provides |
|---------|----------|
| `cryptography` | MITM TLS interception (required for `apps_script` mode with HTTPS sites) |
| `h2` | HTTP/2 multiplexing to the Apps Script relay (significantly faster) |
| `brotli` | Decompression of `Content-Encoding: br` responses |
| `zstandard` | Decompression of `Content-Encoding: zstd` responses |
### Load Balancing
@@ -255,16 +270,26 @@ python3 main.py --no-cert-check # Skip automatic CA install check on st
## Project Files
| File | What It Does |
|------|-------------|
| `main.py` | Starts the proxy |
| `proxy_server.py` | Handles browser connections |
| `domain_fronter.py` | Apps Script relay client (fronted through Google) |
| `h2_transport.py` | Faster connections using HTTP/2 (optional) |
| `mitm.py` | Handles HTTPS certificate generation |
| `cert_installer.py` | Cross-platform CA certificate installer (Windows/macOS/Linux + Firefox) |
| `Code.gs` | The relay script you deploy to Google Apps Script |
| `config.example.json` | Example config — copy to `config.json` |
```
MasterHttpRelayVPN/
├── main.py # Entry point: starts the proxy
├── config.example.json # Copy to config.json and fill in your values
├── requirements.txt # Optional Python dependencies
├── apps_script/
│ └── Code.gs # The relay script you deploy to Google Apps Script
├── ca/ # Generated MITM CA (do NOT share)
│ ├── ca.crt
│ └── ca.key
└── src/ # Proxy implementation
├── proxy_server.py # Accepts HTTP CONNECT and SOCKS5
├── domain_fronter.py # Apps Script relay client (fronted through Google)
├── h2_transport.py # Optional HTTP/2 multiplexing
├── mitm.py # On-the-fly TLS interception
├── cert_installer.py # Cross-platform CA installer (Windows/macOS/Linux + Firefox)
├── codec.py # Content-Encoding decoder (gzip/deflate/br/zstd)
├── constants.py # Tunable defaults and shared data
└── logging_utils.py # Colored, aligned log formatter
```
---
@@ -280,6 +305,7 @@ python3 main.py --no-cert-check # Skip automatic CA install check on st
| Connection timeout | Try a different `google_ip` or check your internet connection |
| Slow browsing | Deploy multiple `Code.gs` copies and use `script_ids` array for load balancing |
| `502 Bad JSON` error | Google returned an unexpected response (HTML instead of JSON). Causes: wrong `script_id`, Apps Script daily quota exhausted, or the deployment wasn't re-created after editing `Code.gs`. Check your `script_id` and create a **new deployment** if you recently changed `Code.gs`. |
| Telegram works on HTTP proxy but not on SOCKS5 | **Expected.** SOCKS5 clients resolve hostnames locally and connect to raw IPs, so Telegram's MTProto-obfuscated bytes reach a blocked IP that we can neither direct-tunnel nor intercept. Configure Telegram as an **HTTP proxy** (`127.0.0.1:8085`) instead — it sends hostnames, which the proxy handles via SNI-rewrite through Google. |
---
+36 -11
View File
@@ -63,7 +63,7 @@ pip install -r requirements.txt
1. وارد [Google Apps Script](https://script.google.com/) شوید.
2. روی **New project** کلیک کنید.
3. کد پیش‌فرض را کامل حذف کنید.
4. فایل `Code.gs` همین پروژه را باز کنید، همه محتوای آن را کپی کنید و داخل Apps Script قرار دهید.
4. فایل `apps_script/Code.gs` همین پروژه را باز کنید، همه محتوای آن را کپی کنید و داخل Apps Script قرار دهید.
5. این خط را به یک رمز دلخواه و امن تغییر دهید:
```javascript
const AUTH_KEY = "your-secret-password-here";
@@ -188,6 +188,20 @@ Firefox معمولا certificate store جداگانه دارد:
| `front_domain` | `www.google.com` | دامنه‌ای که فیلتر می‌بیند |
| `verify_ssl` | `true` | بررسی اعتبار TLS |
| `script_ids` | - | چند Deployment ID برای load balancing |
| `block_hosts` | `[]` | هاست‌هایی که هرگز نباید tunnel شوند (پاسخ 403). نام دقیق (`ads.example.com`) یا پسوند با نقطه‌ی ابتدایی (`.doubleclick.net`). |
| `bypass_hosts` | `["localhost", ".local", ".lan", ".home.arpa"]` | هاست‌هایی که مستقیم می‌روند (بدون MITM و بدون رله). برای منابع داخلی شبکه یا سایت‌هایی که با MITM مشکل دارند. |
| `direct_google_exclude` | مراجعه به [config.example.json](config.example.json) | اپ‌های Google که باید از مسیر MITM برای رله استفاده کنند به‌جای tunnel مستقیم. |
### وابستگی‌های اختیاری
همه وابستگی‌های [`requirements.txt`](requirements.txt) اختیاری هستند — در حالت پایه بدون هیچ‌کدام کار می‌کند، ولی با نصب آن‌ها امکانات بیشتری در دسترس است:
| بسته | کاربرد |
|------|---------|
| `cryptography` | رمزگشایی MITM برای HTTPS (در حالت `apps_script` لازم است) |
| `h2` | ارتباط HTTP/2 با رله Apps Script (به‌طور محسوسی سریع‌تر) |
| `brotli` | پشتیبانی از فشرده‌سازی `Content-Encoding: br` |
| `zstandard` | پشتیبانی از فشرده‌سازی `Content-Encoding: zstd` |
### استفاده از چند Script ID
@@ -241,16 +255,26 @@ python3 main.py --no-cert-check # رد شدن از بررسی خودکار
## فایل‌های پروژه
| فایل | کاربرد |
|------|--------|
| `main.py` | اجرای برنامه |
| `proxy_server.py` | مدیریت اتصال مرورگر |
| `domain_fronter.py` | کلاینت رله Apps Script (با عبور از Google) |
| `h2_transport.py` | ارتباط سریع‌تر با HTTP/2 |
| `mitm.py` | ساخت و مدیریت certificate |
| `cert_installer.py` | نصب خودکار گواهی CA در ویندوز، مک، لینوکس و Firefox |
| `Code.gs` | رله Apps Script |
| `config.example.json` | فایل نمونه تنظیمات |
```
MasterHttpRelayVPN/
├── main.py # نقطه شروع: پراکسی را راه‌اندازی می‌کند
├── config.example.json # نمونه کانفیگ (به config.json کپی شود)
├── requirements.txt # وابستگی‌های اختیاری پایتون
├── apps_script/
│ └── Code.gs # اسکریپت رله روی Google Apps Script
├── ca/ # گواهی MITM (هرگز به اشتراک نگذارید)
│ ├── ca.crt
│ └── ca.key
└── src/ # پیاده‌سازی پراکسی
├── proxy_server.py # دریافت CONNECT و SOCKS5
├── domain_fronter.py # کلاینت رله Apps Script (fronted از طریق Google)
├── h2_transport.py # ارتباط HTTP/2 (اختیاری)
├── mitm.py # ساخت و مدیریت گواهی‌ها
├── cert_installer.py # نصب خودکار CA در ویندوز/مک/لینوکس + فایرفاکس
├── codec.py # رمزگشای Content-Encoding (gzip/deflate/br/zstd)
├── constants.py # مقادیر پیش‌فرض قابل تنظیم
└── logging_utils.py # فرمت‌دهنده‌ی لاگ رنگی و منظم
```
---
@@ -266,6 +290,7 @@ python3 main.py --no-cert-check # رد شدن از بررسی خودکار
| timeout | IP دیگری برای Google امتحان کنید |
| سرعت کم | از چند `script_id` برای load balancing استفاده کنید |
| خطای `502 Bad JSON` | Google به‌جای JSON پاسخ HTML برگردانده (مثلاً صفحه quota یا 404). دلایل: `script_id` اشتباه، تجاوز از سهمیه روزانه Apps Script، یا عدم ایجاد deployment جدید پس از ویرایش `Code.gs`. `script_id` را بررسی کنید و یک **deployment جدید** بسازید. |
| تلگرام روی HTTP proxy کار می‌کند ولی روی SOCKS5 نه | **طبیعی است.** کلاینت SOCKS5 نام دامنه را روی سیستم خودش resolve می‌کند و مستقیم به IP وصل می‌شود، پس بایت‌های MTProto تلگرام به IP فیلترشده می‌رسد که نه می‌توانیم direct-tunnel کنیم و نه MITM. تلگرام را به‌جای SOCKS5 به صورت **HTTP proxy** (`127.0.0.1:8085`) تنظیم کنید — در این حالت نام دامنه ارسال می‌شود و پراکسی با SNI-rewrite از طریق Google عبور می‌دهد. |
---
View File
+26 -8
View File
@@ -1,21 +1,39 @@
{
"_comment": "Copy this file to config.json and fill in your values",
"mode": "apps_script",
"google_ip": "216.239.38.120",
"front_domain": "www.google.com",
"script_id": "YOUR_APPS_SCRIPT_DEPLOYMENT_ID",
"auth_key": "CHANGE_ME_TO_A_STRONG_SECRET",
"listen_host": "127.0.0.1",
"listen_port": 8085,
"socks5_enabled": true,
"socks5_host": "127.0.0.1",
"listen_port": 8085,
"socks5_port": 1080,
"log_level": "INFO",
"verify_ssl": true,
"_direct_google_exclude_comment": "Google web apps that should NEVER use the raw direct-tunnel shortcut. Supports exact hosts and optional suffix patterns like \".googleapis.com\". They will go through the MITM relay path instead for better compatibility.",
"direct_google_exclude": ["gemini.google.com", "aistudio.google.com", "notebooklm.google.com", "labs.google.com", "meet.google.com", "accounts.google.com", "ogs.google.com", "mail.google.com", "calendar.google.com", "drive.google.com", "docs.google.com", "chat.google.com"],
"_direct_google_allow_comment": "Conservative allowlist for raw direct Google tunneling. Leave empty unless you have confirmed a host works better direct than via relay.",
"direct_google_allow": ["www.google.com", "safebrowsing.google.com"],
"_hosts_comment": "Optional SNI-rewrite overrides. YouTube, googlevideo, gstatic, fonts.googleapis.com, ytimg, ggpht, doubleclick, etc. are ALREADY handled automatically (routed via google_ip with SNI=front_domain, same trick as the Xray MITM-DomainFronting config). Add entries here only for custom domains, e.g. \"example.com\": \"216.239.38.120\".",
"block_hosts": [],
"bypass_hosts": [
"localhost",
".local",
".lan",
".home.arpa"
],
"direct_google_exclude": [
"gemini.google.com",
"aistudio.google.com",
"notebooklm.google.com",
"labs.google.com",
"meet.google.com",
"accounts.google.com",
"ogs.google.com",
"mail.google.com",
"calendar.google.com",
"drive.google.com",
"docs.google.com",
"chat.google.com"
],
"direct_google_allow": [
"www.google.com",
"safebrowsing.google.com"
],
"hosts": {}
}
+34 -9
View File
@@ -14,20 +14,28 @@ import logging
import os
import sys
# Project modules live under ./src — put that folder on sys.path so the
# historical flat imports ("from proxy_server import …") keep working.
_SRC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "src")
if _SRC_DIR not in sys.path:
sys.path.insert(0, _SRC_DIR)
from cert_installer import install_ca, is_ca_trusted
from constants import __version__
from logging_utils import configure as configure_logging, print_banner
from mitm import CA_CERT_FILE
from proxy_server import ProxyServer
__version__ = "1.0.0"
def setup_logging(level_name: str):
level = getattr(logging, level_name.upper(), logging.INFO)
logging.basicConfig(
level=level,
format="%(asctime)s [%(name)-12s] %(levelname)-7s %(message)s",
datefmt="%H:%M:%S",
)
configure_logging(level_name)
_PLACEHOLDER_AUTH_KEYS = {
"",
"CHANGE_ME_TO_A_STRONG_SECRET",
"your-secret-password-here",
}
def parse_args():
@@ -136,6 +144,14 @@ def main():
print(f"Missing required config key: {key}")
sys.exit(1)
if config.get("auth_key", "") in _PLACEHOLDER_AUTH_KEYS:
print(
"Refusing to start: 'auth_key' is unset or uses a known placeholder.\n"
"Pick a long random secret and set it in both config.json AND "
"the AUTH_KEY constant inside Code.gs (they must match)."
)
sys.exit(1)
# Always Apps Script mode — force-set for backward-compat configs.
config["mode"] = "apps_script"
sid = config.get("script_ids") or config.get("script_id")
@@ -155,6 +171,7 @@ def main():
setup_logging(config.get("log_level", "INFO"))
log = logging.getLogger("Main")
print_banner(__version__)
log.info("DomainFront Tunnel starting (Apps Script relay)")
log.info("Apps Script relay : SNI=%s → script.google.com",
@@ -197,10 +214,18 @@ def main():
config.get("socks5_port", 1080))
try:
asyncio.run(ProxyServer(config).start())
asyncio.run(_run(config))
except KeyboardInterrupt:
log.info("Stopped")
async def _run(config):
server = ProxyServer(config)
try:
await server.start()
finally:
await server.stop()
if __name__ == "__main__":
main()
+6
View File
@@ -6,3 +6,9 @@ cryptography>=41.0.0
# Optional: HTTP/2 multiplexing (faster apps_script relay)
h2>=4.1.0
# Optional: Brotli decompression (modern websites send `br` encoding)
brotli>=1.1.0
# Optional: Zstandard decompression (some CDNs now serve `zstd`)
zstandard>=0.22.0
+1 -1
View File
@@ -18,7 +18,7 @@ import subprocess
import sys
import tempfile
log = logging.getLogger("CertInstaller")
log = logging.getLogger("Cert")
CERT_NAME = "MasterHttpRelayVPN"
+92
View File
@@ -0,0 +1,92 @@
"""
Content-Encoding decoders: gzip (stdlib), brotli (optional), zstd (optional).
`decode(body, encoding)` returns the decoded bytes, or the original bytes
on any error. Use `supported_encodings()` to build an Accept-Encoding value.
"""
from __future__ import annotations
import gzip
import logging
import zlib
log = logging.getLogger("Codec")
try:
import brotli # type: ignore
_HAS_BR = True
except ImportError: # pragma: no cover
brotli = None # type: ignore
_HAS_BR = False
try:
import zstandard as _zstd # type: ignore
_HAS_ZSTD = True
_ZSTD_DCTX = _zstd.ZstdDecompressor()
except ImportError: # pragma: no cover
_zstd = None # type: ignore
_HAS_ZSTD = False
_ZSTD_DCTX = None
def supported_encodings() -> str:
"""Value for Accept-Encoding that this relay can actually decode."""
codecs = ["gzip", "deflate"]
if _HAS_BR:
codecs.append("br")
if _HAS_ZSTD:
codecs.append("zstd")
return ", ".join(codecs)
def has_brotli() -> bool:
return _HAS_BR
def has_zstd() -> bool:
return _HAS_ZSTD
def decode(body: bytes, encoding: str) -> bytes:
"""Decode *body* according to Content-Encoding.
Returns the original bytes if the encoding is empty, unknown, or
decompression fails (so the caller can safely pass through).
"""
if not body:
return body
enc = (encoding or "").strip().lower()
if not enc or enc == "identity":
return body
# Multi-coding (rare): "gzip, br" means brotli(gzip(data))
if "," in enc:
for layer in reversed([s.strip() for s in enc.split(",") if s.strip()]):
body = decode(body, layer)
return body
try:
if enc == "gzip":
return gzip.decompress(body)
if enc == "deflate":
try:
return zlib.decompress(body)
except zlib.error:
# Some servers send raw deflate without zlib wrapper.
return zlib.decompress(body, -zlib.MAX_WBITS)
if enc == "br":
if not _HAS_BR:
log.debug("brotli not installed — body passed through")
return body
return brotli.decompress(body)
if enc == "zstd":
if not _HAS_ZSTD:
log.debug("zstandard not installed — body passed through")
return body
return _ZSTD_DCTX.decompress(body)
except Exception as exc:
log.debug("decompress (%s) failed: %s — returning raw", enc, exc)
return body
return body
+155
View File
@@ -0,0 +1,155 @@
"""
Central location for tunable constants used across the project.
Values here are chosen for safe defaults; individual entries may be
overridden from `config.json` where noted.
"""
from __future__ import annotations
# ── Version ───────────────────────────────────────────────────────────────
__version__ = "1.1.0"
# ── Size caps ─────────────────────────────────────────────────────────────
MAX_REQUEST_BODY_BYTES = 100 * 1024 * 1024 # 100 MB — inbound browser body
MAX_RESPONSE_BODY_BYTES = 200 * 1024 * 1024 # 200 MB — chunked response cap
MAX_HEADER_BYTES = 64 * 1024 # 64 KB
# ── Timeouts (seconds) ────────────────────────────────────────────────────
CLIENT_IDLE_TIMEOUT = 120
RELAY_TIMEOUT = 25
TLS_CONNECT_TIMEOUT = 15
TCP_CONNECT_TIMEOUT = 10
# ── Response cache ────────────────────────────────────────────────────────
CACHE_MAX_MB = 50
CACHE_TTL_STATIC_LONG = 3600 # images / fonts
CACHE_TTL_STATIC_MED = 1800 # css / js
CACHE_TTL_MAX = 86400 # hard cap on any explicit max-age
# ── Connection pool (HTTP/1.1 to Apps Script) ─────────────────────────────
POOL_MAX = 50
POOL_MIN_IDLE = 15
CONN_TTL = 45.0
SEMAPHORE_MAX = 50
WARM_POOL_COUNT = 30
# ── Batch windows ─────────────────────────────────────────────────────────
BATCH_WINDOW_MICRO = 0.005 # 5 ms
BATCH_WINDOW_MACRO = 0.050 # 50 ms
BATCH_MAX = 50
# ── Direct Google tunnel allow / exclude ──────────────────────────────────
# Google web-apps whose real origin must go through the Apps Script relay
# because direct SNI tunneling to them does not work reliably behind DPI.
GOOGLE_DIRECT_EXACT_EXCLUDE = frozenset({
"gemini.google.com",
"aistudio.google.com",
"notebooklm.google.com",
"labs.google.com",
"meet.google.com",
"accounts.google.com",
"ogs.google.com",
"mail.google.com",
"calendar.google.com",
"drive.google.com",
"docs.google.com",
"chat.google.com",
"photos.google.com",
"maps.google.com",
"myaccount.google.com",
"contacts.google.com",
"classroom.google.com",
"keep.google.com",
"play.google.com",
})
GOOGLE_DIRECT_SUFFIX_EXCLUDE: tuple[str, ...] = (
".meet.google.com",
)
# Hosts that are known to work better when tunneled directly.
GOOGLE_DIRECT_ALLOW_EXACT = frozenset({
"www.google.com",
"google.com",
"safebrowsing.google.com",
})
GOOGLE_DIRECT_ALLOW_SUFFIXES: tuple[str, ...] = ()
# ── Google-owned domain detection ─────────────────────────────────────────
GOOGLE_OWNED_SUFFIXES: tuple[str, ...] = (
".google.com", ".google.co",
".googleapis.com", ".gstatic.com",
".googleusercontent.com",
)
GOOGLE_OWNED_EXACT = frozenset({
"google.com", "gstatic.com", "googleapis.com",
})
# ── SNI-rewrite suffixes ──────────────────────────────────────────────────
# Google-owned properties whose real SNI is DPI-blocked but are served by
# the same edge IP as `front_domain`. Routed through the configured
# `google_ip` with SNI rewritten.
SNI_REWRITE_SUFFIXES: tuple[str, ...] = (
"youtube.com",
"youtu.be",
"youtube-nocookie.com",
"ytimg.com",
"ggpht.com",
"gvt1.com",
"gvt2.com",
"doubleclick.net",
"googlesyndication.com",
"googleadservices.com",
"google-analytics.com",
"googletagmanager.com",
"googletagservices.com",
"fonts.googleapis.com",
)
# ── Response-logging trace hosts ──────────────────────────────────────────
TRACE_HOST_SUFFIXES: tuple[str, ...] = (
"chatgpt.com",
"openai.com",
"gemini.google.com",
"google.com",
"cloudflare.com",
"challenges.cloudflare.com",
"turnstile",
)
# ── File-extension heuristics ─────────────────────────────────────────────
STATIC_EXTS: tuple[str, ...] = (
".css", ".js", ".mjs", ".woff", ".woff2", ".ttf", ".eot",
".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".ico",
".mp3", ".mp4", ".webm", ".wasm", ".avif",
)
LARGE_FILE_EXTS = frozenset({
".zip", ".tar", ".gz", ".bz2", ".xz", ".7z", ".rar",
".exe", ".msi", ".dmg", ".deb", ".rpm", ".apk",
".iso", ".img",
".mp4", ".mkv", ".avi", ".mov", ".webm",
".mp3", ".flac", ".wav", ".aac",
".pdf", ".doc", ".docx", ".ppt", ".pptx",
".wasm",
})
# ── Stateful-request hints ────────────────────────────────────────────────
STATEFUL_HEADER_NAMES: tuple[str, ...] = (
"cookie", "authorization", "proxy-authorization",
"origin", "referer", "if-none-match", "if-modified-since",
"cache-control", "pragma",
)
UNCACHEABLE_HEADER_NAMES: tuple[str, ...] = (
"cookie", "authorization", "proxy-authorization", "range",
"if-none-match", "if-modified-since", "cache-control", "pragma",
)
+104 -53
View File
@@ -11,7 +11,6 @@ returns the response.
import asyncio
import base64
import hashlib
import gzip
import json
import logging
import re
@@ -19,15 +18,27 @@ import ssl
import time
from urllib.parse import urlparse
import codec
from constants import (
BATCH_MAX,
BATCH_WINDOW_MACRO,
BATCH_WINDOW_MICRO,
CONN_TTL,
POOL_MAX,
POOL_MIN_IDLE,
RELAY_TIMEOUT,
SEMAPHORE_MAX,
STATEFUL_HEADER_NAMES,
STATIC_EXTS,
TLS_CONNECT_TIMEOUT,
WARM_POOL_COUNT,
)
log = logging.getLogger("Fronter")
class DomainFronter:
_STATIC_EXTS = (
".css", ".js", ".mjs", ".woff", ".woff2", ".ttf", ".eot",
".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".ico",
".mp3", ".mp4", ".webm", ".wasm", ".avif",
)
_STATIC_EXTS = STATIC_EXTS
def __init__(self, config: dict):
self.connect_host = config.get("google_ip", "216.239.38.120")
@@ -46,22 +57,25 @@ class DomainFronter:
# Connection pool — TTL-based, pre-warmed, with concurrency control
self._pool: list[tuple[asyncio.StreamReader, asyncio.StreamWriter, float]] = []
self._pool_lock = asyncio.Lock()
self._pool_max = 50
self._conn_ttl = 45.0 # seconds before a pooled conn is discarded
self._semaphore = asyncio.Semaphore(50) # max concurrent relay connections
self._pool_max = POOL_MAX
self._conn_ttl = CONN_TTL
self._semaphore = asyncio.Semaphore(SEMAPHORE_MAX)
self._warmed = False
self._refilling = False # background pool refill in progress
self._pool_min_idle = 15 # maintain at least this many idle connections
self._refilling = False
self._pool_min_idle = POOL_MIN_IDLE
self._maintenance_task: asyncio.Task | None = None
self._keepalive_task: asyncio.Task | None = None
self._warm_task: asyncio.Task | None = None
self._bg_tasks: set[asyncio.Task] = set()
# Batch collector for grouping concurrent relay() calls
self._batch_lock = asyncio.Lock()
self._batch_pending: list[tuple[dict, asyncio.Future]] = []
self._batch_task: asyncio.Task | None = None
self._batch_window_micro = 0.005 # 5ms micro-window (single req)
self._batch_window_macro = 0.050 # 50ms macro-window (burst traffic)
self._batch_max = 50 # max requests per batch
self._batch_enabled = True # disabled on first batch API failure
self._batch_window_micro = BATCH_WINDOW_MICRO
self._batch_window_macro = BATCH_WINDOW_MACRO
self._batch_max = BATCH_MAX
self._batch_enabled = True
# Request coalescing — dedup concurrent identical GETs
self._coalesce: dict[str, list[asyncio.Future]] = {}
@@ -79,6 +93,9 @@ class DomainFronter:
except ImportError:
pass
# Capability log for content encodings.
log.info("Response codecs: %s", codec.supported_encodings())
# ── helpers ───────────────────────────────────────────────────
def _ssl_ctx(self) -> ssl.SSLContext:
@@ -115,11 +132,13 @@ class DomainFronter:
writer.close()
except Exception:
pass
reader, writer = await asyncio.wait_for(self._open(), timeout=10)
reader, writer = await asyncio.wait_for(
self._open(), timeout=TLS_CONNECT_TIMEOUT
)
# Pool was empty — trigger aggressive background refill
if not self._refilling:
self._refilling = True
asyncio.create_task(self._refill_pool())
self._spawn(self._refill_pool())
return reader, writer, asyncio.get_event_loop().time()
async def _release(self, reader, writer, created):
@@ -248,13 +267,37 @@ class DomainFronter:
if self._warmed:
return
self._warmed = True
asyncio.create_task(self._do_warm())
self._warm_task = self._spawn(self._do_warm())
# Start continuous pool maintenance
if self._maintenance_task is None:
self._maintenance_task = asyncio.create_task(self._pool_maintenance())
self._maintenance_task = self._spawn(self._pool_maintenance())
# Start H2 connection (runs alongside H1 pool)
if self._h2:
asyncio.create_task(self._h2_connect_and_warm())
self._spawn(self._h2_connect_and_warm())
def _spawn(self, coro) -> asyncio.Task:
"""Create a task and keep a strong reference for clean cancellation."""
task = asyncio.create_task(coro)
self._bg_tasks.add(task)
task.add_done_callback(self._bg_tasks.discard)
return task
async def close(self):
"""Cancel background tasks and close all pooled / H2 connections."""
for task in list(self._bg_tasks):
task.cancel()
if self._bg_tasks:
self._spawn(self._prewarm_script())
if self._keepalive_task is None or self._keepalive_task.done():
self._keepalive_task = self._spawn
await self._flush_pool()
if self._h2:
try:
await self._h2.close()
except Exception as exc:
log.debug("h2 close: %s", exc)
async def _h2_connect(self):
"""Connect the HTTP/2 transport in background."""
@@ -351,7 +394,7 @@ class DomainFronter:
log.debug("Keepalive failed: %s", e)
async def _do_warm(self):
"""Open connections in parallel — failures are fine."""
"""Open WARM_POOL_COUNTnnections in parallel — failures are fine."""
count = 30
coros = [self._add_conn_to_pool() for _ in range(count)]
results = await asyncio.gather(*coros, return_exceptions=True)
@@ -401,29 +444,43 @@ class DomainFronter:
return await self._batch_submit(payload)
async def _coalesced_submit(self, url: str, payload: dict) -> bytes:
"""Dedup concurrent requests for the same URL (no Range header)."""
if url in self._coalesce:
# Another task is already fetching this URL — wait for it
future = asyncio.get_event_loop().create_future()
self._coalesce[url].append(future)
log.debug("Coalesced request: %s", url[:60])
"""Dedup concurrent requests for the same URL (no Range header).
Uses `_batch_lock` to atomically check-and-append, preventing a
race where the owning task's `finally` pops the entry between
the check and append by a second task.
"""
loop = asyncio.get_event_loop()
async with self._batch_lock:
waiters = self._coalesce.get(url)
if waiters is not None:
future = loop.create_future()
waiters.append(future)
log.debug("Coalesced request: %s", url[:60])
waiting = True
else:
self._coalesce[url] = []
waiting = False
if waiting:
return await future
self._coalesce[url] = []
try:
result = await self._batch_submit(payload)
# Resolve all waiters
for f in self._coalesce.get(url, []):
if not f.done():
f.set_result(result)
return result
except Exception as e:
for f in self._coalesce.get(url, []):
async with self._batch_lock:
waiters = self._coalesce.pop(url, [])
for f in waiters:
if not f.done():
f.set_exception(e)
raise
finally:
self._coalesce.pop(url, None)
async with self._batch_lock:
waiters = self._coalesce.pop(url, [])
for f in waiters:
if not f.done():
f.set_result(result)
return result
async def relay_parallel(self, method: str, url: str,
headers: dict, body: bytes = b"",
@@ -621,11 +678,7 @@ class DomainFronter:
return True
if headers:
for name in (
"cookie", "authorization", "proxy-authorization",
"origin", "referer", "if-none-match", "if-modified-since",
"cache-control", "pragma",
):
for name in STATEFUL_HEADER_NAMES:
if cls._header_value(headers, name):
return True
@@ -659,10 +712,10 @@ class DomainFronter:
if self._batch_task and not self._batch_task.done():
self._batch_task.cancel()
self._batch_task = None
asyncio.create_task(self._batch_send(batch))
self._spawn(self._batch_send(batch))
elif self._batch_task is None or self._batch_task.done():
# First request in a new batch window — start timer
self._batch_task = asyncio.create_task(self._batch_timer())
self._batch_task = self._spawn(self._batch_timer())
return await future
@@ -682,7 +735,7 @@ class DomainFronter:
batch = self._batch_pending[:]
self._batch_pending.clear()
self._batch_task = None
asyncio.create_task(self._batch_send(batch))
self._spawn(self._batch_send(batch))
return
# Tier 2: burst detected — wait more to accumulate
@@ -692,7 +745,7 @@ class DomainFronter:
batch = self._batch_pending[:]
self._batch_pending.clear()
self._batch_task = None
asyncio.create_task(self._batch_send(batch))
self._spawn(self._batch_send(batch))
async def _batch_send(self, batch: list):
"""Send a batch of requests. Uses fetchAll for multi, single for one."""
@@ -741,7 +794,7 @@ class DomainFronter:
for attempt in range(2):
try:
return await asyncio.wait_for(
self._relay_single_h2(payload), timeout=25
self._relay_single_h2(payload), timeout=RELAY_TIMEOUT
)
except Exception as e:
if attempt == 0:
@@ -759,7 +812,7 @@ class DomainFronter:
for attempt in range(2):
try:
return await asyncio.wait_for(
self._relay_single(payload), timeout=25
self._relay_single(payload), timeout=RELAY_TIMEOUT
)
except Exception as e:
if attempt == 0:
@@ -1018,12 +1071,10 @@ class DomainFronter:
except asyncio.TimeoutError:
break
# Auto-decompress gzip from Google frontend
if headers.get("content-encoding", "").lower() == "gzip":
try:
body = gzip.decompress(body)
except Exception:
pass # not actually gzip, use as-is
# Auto-decompress (gzip/deflate/br/zstd) from Google frontend
enc = headers.get("content-encoding", "")
if enc:
body = codec.decode(body, enc)
return status, headers, body
+25 -15
View File
@@ -15,12 +15,13 @@ Requires: pip install h2
"""
import asyncio
import gzip
import logging
import socket
import ssl
from urllib.parse import urlparse
import codec
log = logging.getLogger("H2")
try:
@@ -151,9 +152,11 @@ class H2Transport:
# Connection-level flow control: ~16MB window
self._h2.increment_flow_control_window(2 ** 24 - 65535)
# Per-stream settings: 1MB initial window, disable server push
# Per-stream settings: 8MB initial window (covers all typical relay
# request bodies in one shot so we never have to stall for a
# WINDOW_UPDATE mid-send). Disable server push.
self._h2.update_settings({
h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 1 * 1024 * 1024,
h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 8 * 1024 * 1024,
h2.settings.SettingCodes.ENABLE_PUSH: 0,
})
@@ -246,7 +249,7 @@ class H2Transport:
(":path", path),
(":authority", host),
(":scheme", "https"),
("accept-encoding", "gzip"),
("accept-encoding", codec.supported_encodings()),
]
if headers:
for k, v in headers.items():
@@ -279,30 +282,37 @@ class H2Transport:
if state.error:
raise ConnectionError(f"H2 stream error: {state.error}")
# Auto-decompress gzip
# Auto-decompress (gzip / deflate / brotli / zstd)
resp_body = bytes(state.data)
if state.headers.get("content-encoding", "").lower() == "gzip":
try:
resp_body = gzip.decompress(resp_body)
except Exception:
pass
enc = state.headers.get("content-encoding", "")
if enc:
resp_body = codec.decode(resp_body, enc)
return state.status, state.headers, resp_body
def _send_body(self, stream_id: int, body: bytes):
"""Send request body, respecting H2 flow control window."""
# For small bodies (typical JSON payloads), send in one shot
"""Send request body, respecting H2 flow control window.
The initial per-stream window is 8 MB (see _do_connect) which
comfortably covers all relay JSON payloads. If the body is ever
larger than the available window, we raise rather than silently
truncate the caller will retry on a fresh connection.
"""
sent = 0
total = len(body)
while body:
max_size = self._h2.local_settings.max_frame_size
window = self._h2.local_flow_control_window(stream_id)
send_size = min(len(body), max_size, window)
if send_size <= 0:
# Flow control full — let the reader loop process
# window updates before we continue
break
raise BufferError(
f"H2 flow control exhausted after {sent}/{total} bytes; "
f"increase initial window or shrink payload"
)
end = send_size >= len(body)
self._h2.send_data(stream_id, body[:send_size], end_stream=end)
body = body[send_size:]
sent += send_size
# ── Background reader ─────────────────────────────────────────
+248
View File
@@ -0,0 +1,248 @@
"""
Pretty, column-aligned, color-aware logging for MasterHttpRelayVPN.
Zero extra dependencies. On Windows, ANSI color support is enabled via
the Console API. Colors are disabled automatically when:
- The output stream is not a TTY (e.g. piped to a file)
- The NO_COLOR environment variable is set
- DFT_NO_COLOR=1 is set
"""
from __future__ import annotations
import logging
import os
import sys
import time
# ─── ANSI palette ──────────────────────────────────────────────────────────
RESET = "\x1b[0m"
BOLD = "\x1b[1m"
DIM = "\x1b[2m"
ITALIC = "\x1b[3m"
# 8-bit / truecolor friendly foreground codes
FG_GRAY = "\x1b[38;5;245m"
FG_BLUE = "\x1b[38;5;39m"
FG_CYAN = "\x1b[38;5;45m"
FG_GREEN = "\x1b[38;5;42m"
FG_YELLOW = "\x1b[38;5;214m"
FG_RED = "\x1b[38;5;203m"
FG_MAGENTA = "\x1b[38;5;177m"
FG_PURPLE = "\x1b[38;5;141m"
FG_TEAL = "\x1b[38;5;80m"
FG_ORANGE = "\x1b[38;5;208m"
LEVEL_STYLE = {
"DEBUG": f"{DIM}{FG_GRAY}",
"INFO": f"{FG_GREEN}",
"WARNING": f"{BOLD}{FG_YELLOW}",
"ERROR": f"{BOLD}{FG_RED}",
"CRITICAL": f"{BOLD}{FG_MAGENTA}",
}
LEVEL_GLYPH = {
"DEBUG": "·",
"INFO": "",
"WARNING": "!",
"ERROR": "",
"CRITICAL": "",
}
LEVEL_LABEL = {
"DEBUG": "DEBUG",
"INFO": "INFO ",
"WARNING": "WARN ",
"ERROR": "ERROR",
"CRITICAL": "CRIT ",
}
# Stable per-component color (keeps log scanning easy).
COMPONENT_COLORS = {
"Main": FG_CYAN,
"Proxy": FG_BLUE,
"Fronter": FG_PURPLE,
"H2": FG_TEAL,
"MITM": FG_ORANGE,
"Cert": FG_MAGENTA,
}
# ─── color support detection ───────────────────────────────────────────────
def _supports_color(stream) -> bool:
if os.environ.get("NO_COLOR"):
return False
if os.environ.get("DFT_NO_COLOR") == "1":
return False
if os.environ.get("FORCE_COLOR") or os.environ.get("DFT_FORCE_COLOR"):
return True
if not hasattr(stream, "isatty") or not stream.isatty():
return False
if sys.platform != "win32":
return True
# Try to enable ANSI on Windows 10+ consoles.
try:
import ctypes
kernel32 = ctypes.windll.kernel32
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
# -11 = STD_OUTPUT_HANDLE
handle = kernel32.GetStdHandle(-11)
mode = ctypes.c_ulong()
if not kernel32.GetConsoleMode(handle, ctypes.byref(mode)):
return False
if kernel32.SetConsoleMode(
handle, mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING
):
return True
except Exception:
return False
return False
# ─── formatter ─────────────────────────────────────────────────────────────
class PrettyFormatter(logging.Formatter):
"""Column-aligned formatter with optional ANSI colors."""
COMPONENT_WIDTH = 8
def __init__(self, *, use_color: bool):
super().__init__()
self.use_color = use_color
self._start = time.time()
# -- helpers ------------------------------------------------------------
def _c(self, code: str) -> str:
return code if self.use_color else ""
def _fmt_time(self, record: logging.LogRecord) -> str:
t = time.localtime(record.created)
ms = int((record.created - int(record.created)) * 1000)
return f"{time.strftime('%H:%M:%S', t)}.{ms:03d}"
def _fmt_level(self, levelname: str) -> str:
label = LEVEL_LABEL.get(levelname, levelname[:5].ljust(5))
glyph = LEVEL_GLYPH.get(levelname, "·")
style = LEVEL_STYLE.get(levelname, "")
if self.use_color:
return f"{style}{glyph} {label}{RESET}"
return f"{glyph} {label}"
def _fmt_component(self, name: str) -> str:
label = name[: self.COMPONENT_WIDTH].ljust(self.COMPONENT_WIDTH)
if not self.use_color:
return f"[{label}]"
color = COMPONENT_COLORS.get(name, FG_GRAY)
return f"{DIM}[{RESET}{color}{label}{RESET}{DIM}]{RESET}"
def format(self, record: logging.LogRecord) -> str:
# Pre-render message (honors %-args and {}-args).
try:
message = record.getMessage()
except Exception:
message = record.msg
time_part = self._fmt_time(record)
level_part = self._fmt_level(record.levelname)
comp_part = self._fmt_component(record.name)
if self.use_color:
time_part = f"{DIM}{FG_GRAY}{time_part}{RESET}"
line = f"{time_part} {level_part} {comp_part} {message}"
# Exception tracebacks: render dimmed below the main line.
if record.exc_info:
tb = self.formatException(record.exc_info)
if self.use_color:
tb = f"{DIM}{FG_GRAY}{tb}{RESET}"
line = f"{line}\n{tb}"
if record.stack_info:
si = record.stack_info
if self.use_color:
si = f"{DIM}{FG_GRAY}{si}{RESET}"
line = f"{line}\n{si}"
return line
# ─── public API ────────────────────────────────────────────────────────────
def configure(level: str = "INFO", *, stream=None) -> None:
"""Install the pretty formatter on the root logger.
Safe to call multiple times; replaces prior handlers set up by this
module and leaves unrelated handlers alone (for tests / embedding).
"""
stream = stream or sys.stderr
use_color = _supports_color(stream)
handler = logging.StreamHandler(stream)
handler.setFormatter(PrettyFormatter(use_color=use_color))
handler.set_name("mhrvpn.pretty")
root = logging.getLogger()
root.setLevel(getattr(logging, level.upper(), logging.INFO))
# Remove previous pretty handler(s) we installed.
for h in list(root.handlers):
if getattr(h, "name", "") == "mhrvpn.pretty":
root.removeHandler(h)
root.addHandler(handler)
# Suppress cosmetic asyncio warning spam:
# "returning true from eof_received() has no effect when using ssl"
# It originates in Python's own StreamReaderProtocol when we wrap a
# stream in TLS via start_tls(); there's nothing actionable to do.
_install_asyncio_noise_filter()
class _AsyncioNoiseFilter(logging.Filter):
_SUPPRESSED = (
"returning true from eof_received() has no effect when using ssl",
)
def filter(self, record: logging.LogRecord) -> bool: # noqa: D401
try:
msg = record.getMessage()
except Exception:
return True
return not any(s in msg for s in self._SUPPRESSED)
def _install_asyncio_noise_filter() -> None:
f = _AsyncioNoiseFilter()
aio = logging.getLogger("asyncio")
# Don't stack duplicates on repeat configure() calls.
for existing in list(aio.filters):
if isinstance(existing, _AsyncioNoiseFilter):
aio.removeFilter(existing)
aio.addFilter(f)
def print_banner(version: str, *, stream=None) -> None:
"""Print a compact startup banner with color fallbacks."""
stream = stream or sys.stderr
color = _supports_color(stream)
def c(code: str) -> str:
return code if color else ""
title = "MasterHttpRelayVPN"
subtitle = f"Domain-Fronted Apps Script Relay · v{version}"
bar = "" * (len(title) + len(subtitle) + 7)
print(f"{c(DIM)}{c(FG_GRAY)}{bar}{c(RESET)}", file=stream)
print(
f" {c(BOLD)}{c(FG_CYAN)}{title}{c(RESET)}"
f" {c(DIM)}·{c(RESET)} {c(FG_GRAY)}{subtitle}{c(RESET)}",
file=stream,
)
print(f"{c(DIM)}{c(FG_GRAY)}{bar}{c(RESET)}", file=stream)
stream.flush()
+37 -5
View File
@@ -13,6 +13,7 @@ Requires: pip install cryptography
import datetime
import logging
import os
import re
import ssl
import tempfile
@@ -23,11 +24,26 @@ from cryptography.x509.oid import NameOID
log = logging.getLogger("MITM")
CA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ca")
# CA lives at the project root (../ca/ relative to this file in src/).
# The installed trusted root was generated there; keep using it.
_THIS_DIR = os.path.dirname(os.path.abspath(__file__))
_PROJECT_ROOT = os.path.dirname(_THIS_DIR)
CA_DIR = os.path.join(_PROJECT_ROOT, "ca")
CA_KEY_FILE = os.path.join(CA_DIR, "ca.key")
CA_CERT_FILE = os.path.join(CA_DIR, "ca.crt")
# Filename-safe form of an SNI / hostname. Windows forbids colons,
# question marks, etc., so IPv6 literals (and stray Unicode) must be
# rewritten before they become part of a cached cert file path.
_UNSAFE_NAME_RE = re.compile(r"[^A-Za-z0-9._-]")
def _safe_domain_filename(domain: str) -> str:
cleaned = _UNSAFE_NAME_RE.sub("_", domain.strip(".").lower())
return cleaned[:120] or "unknown"
class MITMCertManager:
def __init__(self):
self._ca_key = None
@@ -95,6 +111,13 @@ class MITMCertManager:
serialization.NoEncryption(),
)
)
# Restrict the CA private key to the current user on POSIX.
# os.chmod is a no-op for permission bits on Windows.
if os.name == "posix":
try:
os.chmod(CA_KEY_FILE, 0o600)
except OSError:
pass
with open(CA_CERT_FILE, "wb") as f:
f.write(self._ca_cert.public_bytes(serialization.Encoding.PEM))
@@ -105,8 +128,9 @@ class MITMCertManager:
if domain not in self._ctx_cache:
key_pem, cert_pem = self._generate_domain_cert(domain)
cert_file = os.path.join(self._cert_dir, f"{domain}.crt")
key_file = os.path.join(self._cert_dir, f"{domain}.key")
safe = _safe_domain_filename(domain)
cert_file = os.path.join(self._cert_dir, f"{safe}.crt")
key_file = os.path.join(self._cert_dir, f"{safe}.key")
ca_pem = self._ca_cert.public_bytes(serialization.Encoding.PEM)
with open(cert_file, "wb") as f:
@@ -126,8 +150,16 @@ class MITMCertManager:
public_exponent=65537, key_size=2048
)
subject = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, domain),
x509.NameAttribute(NameOID.COMMON_NAME, domain[:64] or "unknown"),
])
# SAN: IP literal vs DNS name — x509.DNSName rejects IPv6 literals.
import ipaddress as _ipaddress
try:
san_entry = x509.IPAddress(_ipaddress.ip_address(domain))
except ValueError:
san_entry = x509.DNSName(domain)
now = datetime.datetime.now(datetime.timezone.utc)
cert = (
x509.CertificateBuilder()
@@ -138,7 +170,7 @@ class MITMCertManager:
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=365))
.add_extension(
x509.SubjectAlternativeName([x509.DNSName(domain)]),
x509.SubjectAlternativeName([san_entry]),
critical=False,
)
.sign(self._ca_key, hashes.SHA256())
+247 -122
View File
@@ -15,11 +15,56 @@ import time
import ipaddress
from urllib.parse import urlparse
from constants import (
CACHE_MAX_MB,
CACHE_TTL_MAX,
CACHE_TTL_STATIC_LONG,
CACHE_TTL_STATIC_MED,
CLIENT_IDLE_TIMEOUT,
GOOGLE_DIRECT_ALLOW_EXACT,
GOOGLE_DIRECT_ALLOW_SUFFIXES,
GOOGLE_DIRECT_EXACT_EXCLUDE,
GOOGLE_DIRECT_SUFFIX_EXCLUDE,
GOOGLE_OWNED_EXACT,
GOOGLE_OWNED_SUFFIXES,
LARGE_FILE_EXTS,
MAX_HEADER_BYTES,
MAX_REQUEST_BODY_BYTES,
SNI_REWRITE_SUFFIXES,
STATIC_EXTS,
TCP_CONNECT_TIMEOUT,
TRACE_HOST_SUFFIXES,
UNCACHEABLE_HEADER_NAMES,
)
from domain_fronter import DomainFronter
log = logging.getLogger("Proxy")
def _is_ip_literal(host: str) -> bool:
"""True for IPv4/IPv6 literals (strips brackets around IPv6)."""
h = host.strip("[]")
try:
ipaddress.ip_address(h)
return True
except ValueError:
return False
def _parse_content_length(header_block: bytes) -> int:
"""Return Content-Length or 0. Matches only the exact header name."""
for raw_line in header_block.split(b"\r\n"):
name, sep, value = raw_line.partition(b":")
if not sep:
continue
if name.strip().lower() == b"content-length":
try:
return int(value.strip())
except ValueError:
return 0
return 0
class ResponseCache:
"""Simple LRU response cache — avoids repeated relay calls."""
@@ -75,25 +120,20 @@ class ResponseCache:
# Explicit max-age
m = re.search(r"max-age=(\d+)", hdr)
if m:
return min(int(m.group(1)), 86400)
return min(int(m.group(1)), CACHE_TTL_MAX)
# Heuristic by content type / extension
path = url.split("?")[0].lower()
static_exts = (
".css", ".js", ".woff", ".woff2", ".ttf", ".eot",
".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".ico",
".mp3", ".mp4", ".wasm",
)
for ext in static_exts:
for ext in STATIC_EXTS:
if path.endswith(ext):
return 3600 # 1 hour for static assets
return CACHE_TTL_STATIC_LONG
ct_m = re.search(r"content-type:\s*([^\r\n]+)", hdr)
ct = ct_m.group(1) if ct_m else ""
if "image/" in ct or "font/" in ct:
return 3600
return CACHE_TTL_STATIC_LONG
if "text/css" in ct or "javascript" in ct:
return 1800
return CACHE_TTL_STATIC_MED
if "text/html" in ct or "application/json" in ct:
return 0 # don't cache dynamic content by default
@@ -101,45 +141,12 @@ class ResponseCache:
class ProxyServer:
_GOOGLE_DIRECT_EXACT_EXCLUDE = {
"gemini.google.com",
"aistudio.google.com",
"notebooklm.google.com",
"labs.google.com",
"meet.google.com",
"accounts.google.com",
"ogs.google.com",
"mail.google.com",
"calendar.google.com",
"drive.google.com",
"docs.google.com",
"chat.google.com",
"photos.google.com",
"maps.google.com",
"myaccount.google.com",
"contacts.google.com",
"classroom.google.com",
"keep.google.com",
"play.google.com",
}
_GOOGLE_DIRECT_SUFFIX_EXCLUDE = (
".meet.google.com",
)
_GOOGLE_DIRECT_ALLOW_EXACT = {
"www.google.com",
"google.com",
"safebrowsing.google.com",
}
_GOOGLE_DIRECT_ALLOW_SUFFIXES = ()
_TRACE_HOST_SUFFIXES = (
"chatgpt.com",
"openai.com",
"gemini.google.com",
"google.com",
"cloudflare.com",
"challenges.cloudflare.com",
"turnstile",
)
# Pulled from constants.py so users can override any subset via config.
_GOOGLE_DIRECT_EXACT_EXCLUDE = GOOGLE_DIRECT_EXACT_EXCLUDE
_GOOGLE_DIRECT_SUFFIX_EXCLUDE = GOOGLE_DIRECT_SUFFIX_EXCLUDE
_GOOGLE_DIRECT_ALLOW_EXACT = GOOGLE_DIRECT_ALLOW_EXACT
_GOOGLE_DIRECT_ALLOW_SUFFIXES = GOOGLE_DIRECT_ALLOW_SUFFIXES
_TRACE_HOST_SUFFIXES = TRACE_HOST_SUFFIXES
def __init__(self, config: dict):
self.host = config.get("listen_host", "127.0.0.1")
@@ -149,8 +156,9 @@ class ProxyServer:
self.socks_port = config.get("socks5_port", 1080)
self.fronter = DomainFronter(config)
self.mitm = None
self._cache = ResponseCache(max_mb=50)
self._cache = ResponseCache(max_mb=CACHE_MAX_MB)
self._direct_fail_until: dict[str, float] = {}
self._servers: list[asyncio.base_events.Server] = []
# hosts override — DNS fake-map: domain/suffix → IP
# Checked before any real DNS lookup; supports exact and suffix matching.
@@ -172,6 +180,14 @@ class ProxyServer:
)
}
# ── Per-host policy ────────────────────────────────────────
# block_hosts — refuse traffic entirely (close or 403)
# bypass_hosts — route directly (no MITM, no relay)
# Both accept exact hostnames and leading-dot suffix patterns,
# e.g. ".local" matches any *.local domain.
self._block_hosts = self._load_host_rules(config.get("block_hosts", []))
self._bypass_hosts = self._load_host_rules(config.get("bypass_hosts", []))
try:
from mitm import MITMCertManager
self.mitm = MITMCertManager()
@@ -180,6 +196,45 @@ class ProxyServer:
log.error("Run: pip install cryptography")
raise SystemExit(1)
# ── Host-policy helpers ───────────────────────────────────────
@staticmethod
def _load_host_rules(raw) -> tuple[set[str], tuple[str, ...]]:
"""Accept a list of host strings; return (exact_set, suffix_tuple).
A rule starting with '.' (e.g. ".internal") is a suffix rule.
Everything else is treated as an exact match. Case-insensitive.
"""
exact: set[str] = set()
suffixes: list[str] = []
for item in raw or []:
h = str(item).strip().lower().rstrip(".")
if not h:
continue
if h.startswith("."):
suffixes.append(h)
else:
exact.add(h)
return exact, tuple(suffixes)
@staticmethod
def _host_matches_rules(host: str,
rules: tuple[set[str], tuple[str, ...]]) -> bool:
exact, suffixes = rules
h = host.lower().rstrip(".")
if h in exact:
return True
for s in suffixes:
if h.endswith(s):
return True
return False
def _is_blocked(self, host: str) -> bool:
return self._host_matches_rules(host, self._block_hosts)
def _is_bypassed(self, host: str) -> bool:
return self._host_matches_rules(host, self._bypass_hosts)
@staticmethod
def _header_value(headers: dict | None, name: str) -> str:
if not headers:
@@ -193,10 +248,7 @@ class ProxyServer:
headers: dict | None, body: bytes) -> bool:
if method.upper() != "GET" or body:
return False
for name in (
"cookie", "authorization", "proxy-authorization", "range",
"if-none-match", "if-modified-since", "cache-control", "pragma",
):
for name in UNCACHEABLE_HEADER_NAMES:
if self._header_value(headers, name):
return False
return self.fronter._is_static_asset_url(url)
@@ -249,6 +301,8 @@ class ProxyServer:
log.error("SOCKS5 listener failed on %s:%d: %s",
self.socks_host, self.socks_port, e)
self._servers = [s for s in (http_srv, socks_srv) if s]
log.info(
"HTTP proxy listening on %s:%d",
self.host, self.port,
@@ -269,6 +323,24 @@ class ProxyServer:
else:
await http_srv.serve_forever()
async def stop(self):
"""Shut down all listeners and release relay resources."""
for srv in self._servers:
try:
srv.close()
except Exception:
pass
for srv in self._servers:
try:
await srv.wait_closed()
except Exception:
pass
self._servers = []
try:
await self.fronter.close()
except Exception as exc:
log.debug("fronter.close: %s", exc)
# ── client handler ────────────────────────────────────────────
async def _on_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
@@ -283,6 +355,9 @@ class ProxyServer:
while True:
line = await asyncio.wait_for(reader.readline(), timeout=10)
header_block += line
if len(header_block) > MAX_HEADER_BYTES:
log.warning("Request header block exceeds cap — closing")
return
if line in (b"\r\n", b"\n", b""):
break
@@ -397,6 +472,54 @@ class ProxyServer:
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
"""Route a target connection through the Apps Script relay."""
# ── Block / bypass policy ─────────────────────────────────
if self._is_blocked(host):
log.warning("BLOCKED → %s:%d (matches block_hosts)", host, port)
try:
writer.write(b"HTTP/1.1 403 Forbidden\r\nContent-Length: 0\r\n\r\n")
await writer.drain()
except Exception:
pass
return
if self._is_bypassed(host):
log.info("Bypass tunnel → %s:%d (matches bypass_hosts)", host, port)
await self._do_direct_tunnel(host, port, reader, writer)
return
# ── IP-literal destinations ───────────────────────────────
# Prefer a direct tunnel first (works for unblocked IPs and keeps
# TLS end-to-end). If the network blocks the route (common for
# Telegram data-centers behind DPI), fall back to:
# • port 443 → MITM + relay through Apps Script
# • port 80 → plain-HTTP relay through Apps Script
# • other → give up (non-HTTP; can't be relayed)
# We remember per-IP failures for a short while so subsequent
# connects skip the doomed direct attempt.
if _is_ip_literal(host):
if not self._direct_temporarily_disabled(host):
log.info("Direct tunnel → %s:%d (IP literal)", host, port)
ok = await self._do_direct_tunnel(host, port, reader, writer)
if ok:
return
self._remember_direct_failure(host, ttl=300)
if port not in (80, 443):
log.warning("Direct tunnel failed for %s:%d", host, port)
return
log.warning(
"Direct tunnel fallback → %s:%d (switching to relay)",
host, port,
)
else:
log.info(
"Relay fallback → %s:%d (direct temporarily disabled)",
host, port,
)
if port == 443:
await self._do_mitm_connect(host, port, reader, writer)
elif port == 80:
await self._do_plain_http_tunnel(host, port, reader, writer)
return
override_ip = self._sni_rewrite_ip(host)
if override_ip:
@@ -429,34 +552,22 @@ class ProxyServer:
await self._do_plain_http_tunnel(host, port, reader, writer)
elif port == 443:
await self._do_mitm_connect(host, port, reader, writer)
else:
elif port == 80:
await self._do_plain_http_tunnel(host, port, reader, writer)
else:
# Non-HTTP port (e.g. mtalk:5228 XMPP, IMAP, SMTP, SSH) —
# payload isn't HTTP, so we can't relay or MITM. Tunnel bytes.
log.info("Direct tunnel → %s:%d (non-HTTP port)", host, port)
ok = await self._do_direct_tunnel(host, port, reader, writer)
if not ok:
log.warning("Direct tunnel failed for %s:%d", host, port)
# ── Hosts override (fake DNS) ─────────────────────────────────
# Built-in list of domains that must be reached via Google's frontend IP
# with SNI rewritten to `front_domain` (default: www.google.com).
# These are Google-owned services whose real SNI is DPI-blocked in some
# countries, but that Google serves from the same edge IP as www.google.com.
# Users don't need to configure anything — any host matching one of these
# suffixes is transparently SNI-rewritten to the configured `google_ip`.
# Config's "hosts" map still takes precedence (for custom overrides).
_SNI_REWRITE_SUFFIXES = (
"youtube.com",
"youtu.be",
"youtube-nocookie.com",
"ytimg.com",
"ggpht.com",
"gvt1.com",
"gvt2.com",
"doubleclick.net",
"googlesyndication.com",
"googleadservices.com",
"google-analytics.com",
"googletagmanager.com",
"googletagservices.com",
"fonts.googleapis.com",
)
# Source: constants.SNI_REWRITE_SUFFIXES.
_SNI_REWRITE_SUFFIXES = SNI_REWRITE_SUFFIXES
def _sni_rewrite_ip(self, host: str) -> str | None:
"""Return the IP to SNI-rewrite `host` through, or None.
@@ -493,17 +604,12 @@ class ProxyServer:
# ── Google domain detection ───────────────────────────────────
# Only domains whose SNI the ISP does NOT block — direct tunnel is safe.
# YouTube/googlevideo SNIs are blocked; they go through _do_sni_rewrite_tunnel
# via the hosts map instead.
_GOOGLE_OWNED_SUFFIXES = (
".google.com", ".google.co",
".googleapis.com", ".gstatic.com",
".googleusercontent.com",
)
_GOOGLE_OWNED_EXACT = {
"google.com", "gstatic.com", "googleapis.com",
}
# Google-owned domains that may use the raw direct-tunnel shortcut.
# YouTube/googlevideo SNIs are blocked; they go through
# _do_sni_rewrite_tunnel via the hosts map instead.
# Source: constants.GOOGLE_OWNED_SUFFIXES / GOOGLE_OWNED_EXACT.
_GOOGLE_OWNED_SUFFIXES = GOOGLE_OWNED_SUFFIXES
_GOOGLE_OWNED_EXACT = GOOGLE_OWNED_EXACT
def _is_google_domain(self, host: str) -> bool:
"""Return True if host should use the raw direct Google shortcut."""
@@ -657,10 +763,16 @@ class ProxyServer:
except Exception as e:
log.debug("Pipe %s ended: %s", label, e)
finally:
# Half-close rather than hard-close so the other direction
# can still flush final bytes (important for TLS close_notify).
try:
dst.close()
if not dst.is_closing() and dst.can_write_eof():
dst.write_eof()
except Exception:
pass
try:
dst.close()
except Exception:
pass
await asyncio.gather(
pipe(reader, w_remote, f"client→{host}"),
@@ -759,13 +871,36 @@ class ProxyServer:
transport, protocol, ssl_ctx, server_side=True,
)
except Exception as e:
# Non-HTTPS traffic (e.g. MTProto, plain HTTP on port 80/443)
# routed through the proxy will always fail TLS — log at DEBUG
# to avoid alarming noise.
if port != 443:
log.debug("TLS handshake skipped for %s:%d (non-HTTPS): %s", host, port, e)
# TLS handshake failed. Common causes:
# • Telegram Desktop / MTProto over port 443 sends obfuscated
# non-TLS bytes — we literally cannot decrypt these, and
# since the target IP is blocked we can't direct-tunnel
# either. The only workaround is to configure Telegram as
# an HTTP proxy (not SOCKS5), so it sends hostnames our
# SNI-rewrite path can handle.
# • Client CONNECTs but never speaks TLS (some probes).
if _is_ip_literal(host) and port == 443:
log.warning(
"MITM TLS handshake failed for %s:%d (%s). "
"Likely non-TLS traffic (e.g. Telegram MTProto over "
"SOCKS5). Cannot relay raw TCP to a blocked IP — "
"use the HTTP proxy instead so hostnames are preserved.",
host, port, e,
)
elif port != 443:
log.debug(
"TLS handshake skipped for %s:%d (non-HTTPS): %s",
host, port, e,
)
else:
log.debug("TLS handshake failed for %s: %s", host, e)
# Close the client side so it fails fast and can retry, rather
# than hanging on a half-open connection.
try:
if not writer.is_closing():
writer.close()
except Exception:
pass
return
# Update writer to use the new TLS transport
@@ -778,7 +913,9 @@ class ProxyServer:
# Read and relay HTTP requests from the browser (now decrypted)
while True:
try:
first_line = await asyncio.wait_for(reader.readline(), timeout=120)
first_line = await asyncio.wait_for(
reader.readline(), timeout=CLIENT_IDLE_TIMEOUT
)
if not first_line:
break
@@ -786,18 +923,18 @@ class ProxyServer:
while True:
line = await asyncio.wait_for(reader.readline(), timeout=10)
header_block += line
if len(header_block) > MAX_HEADER_BYTES:
break
if line in (b"\r\n", b"\n", b""):
break
# Read body
body = b""
for raw_line in header_block.split(b"\r\n"):
if raw_line.lower().startswith(b"content-length:"):
length = int(raw_line.split(b":", 1)[1].strip())
if length > 100 * 1024 * 1024: # 100 MB cap
raise ValueError(f"Request body too large: {length} bytes")
body = await reader.readexactly(length)
break
length = _parse_content_length(header_block)
if length > MAX_REQUEST_BODY_BYTES:
raise ValueError(f"Request body too large: {length} bytes")
if length > 0:
body = await reader.readexactly(length)
# Parse the request
request_line = first_line.decode(errors="replace").strip()
@@ -982,18 +1119,8 @@ class ProxyServer:
def _is_likely_download(self, url: str, headers: dict) -> bool:
"""Heuristic: is this URL likely a large file download?"""
# Check file extension
path = url.split("?")[0].lower()
large_exts = {
".zip", ".tar", ".gz", ".bz2", ".xz", ".7z", ".rar",
".exe", ".msi", ".dmg", ".deb", ".rpm", ".apk",
".iso", ".img",
".mp4", ".mkv", ".avi", ".mov", ".webm",
".mp3", ".flac", ".wav", ".aac",
".pdf", ".doc", ".docx", ".ppt", ".pptx",
".wasm",
}
for ext in large_exts:
for ext in LARGE_FILE_EXTS:
if path.endswith(ext):
return True
return False
@@ -1002,15 +1129,13 @@ class ProxyServer:
async def _do_http(self, header_block: bytes, reader, writer):
body = b""
for raw_line in header_block.split(b"\r\n"):
if raw_line.lower().startswith(b"content-length:"):
length = int(raw_line.split(b":", 1)[1].strip())
if length > 100 * 1024 * 1024: # 100 MB cap
writer.write(b"HTTP/1.1 413 Content Too Large\r\n\r\n")
await writer.drain()
return
body = await reader.readexactly(length)
break
length = _parse_content_length(header_block)
if length > MAX_REQUEST_BODY_BYTES:
writer.write(b"HTTP/1.1 413 Content Too Large\r\n\r\n")
await writer.drain()
return
if length > 0:
body = await reader.readexactly(length)
first_line = header_block.split(b"\r\n")[0].decode(errors="replace")
log.info("HTTP → %s", first_line)