Merge branch 'python_testing' into feature/uninstall-cert

This commit is contained in:
mahan
2026-04-24 16:13:31 +03:30
committed by GitHub
12 changed files with 1651 additions and 268 deletions
+78 -2
View File
@@ -16,6 +16,28 @@ For the latest news, releases, and project updates, follow our Telegram channel:
---
### If you like this project, please support it by starring it on GitHub (⭐). It helps the project get discovered.
---
### Optional Financial Support 💸
- TON network:
`masterking32.ton`
- EVM-compatible networks (ETH and compatible chains):
`0x517f07305D6ED781A089322B6cD93d1461bF8652`
- TRC20 network (TRON):
`TLApdY8APWkFHHoxebxGY8JhMeChiETqFH`
Every contribution and every piece of feedback is appreciated. Support directly helps ongoing development and improvement.
---
## Disclaimer
MasterHttpRelayVPN is provided for educational, testing, and research purposes only.
@@ -263,12 +285,22 @@ This project focuses entirely on the **Apps Script** relay — a free Google acc
|---------|---------|-------------|
| `google_ip` | `216.239.38.120` | Google IP address to connect through |
| `front_domain` | `www.google.com` | Domain shown to the firewall/filter |
| `verify_ssl` | `true` | Verify TLS certificates |
| `verify_ssl` | `true` | Verify the TLS certificate on the local fronted connection to Google/CDN |
| `relay_timeout` | `25` | Total timeout for one relayed request before it fails |
| `tls_connect_timeout` | `15` | Timeout for the proxy's TLS connection to the fronted Google/CDN endpoint |
| `tcp_connect_timeout` | `10` | Timeout for direct TCP tunnels and outbound SNI-rewrite connects |
| `max_response_body_bytes` | `209715200` | Hard cap for a single relay response body after buffering/decoding |
| `script_ids` | — | Multiple Script IDs for load balancing (array) |
| `chunked_download_extensions` | see [config.example.json](config.example.json) | File extensions that should use parallel range downloading. Supports `".*"` to probe all GET downloads. |
| `chunked_download_min_size` | `5242880` | Minimum total file size (5 MB) before range-parallel download stays enabled |
| `chunked_download_chunk_size` | `524288` | Per-range chunk size used by parallel downloads |
| `chunked_download_max_parallel` | `8` | Maximum simultaneous range requests for one download |
| `chunked_download_max_chunks` | `256` | Soft upper bound for total chunk requests; chunk size is raised automatically for very large files |
| `block_hosts` | `[]` | Hosts that must never be tunneled (return HTTP 403). Supports exact names (`ads.example.com`) or leading-dot suffixes (`.doubleclick.net`). |
| `bypass_hosts` | `["localhost", ".local", ".lan", ".home.arpa"]` | Hosts that go direct (no MITM, no relay). Useful for LAN resources or sites that break under MITM. |
| `direct_google_exclude` | see [config.example.json](config.example.json) | Google apps that must use the MITM relay path instead of the fast direct tunnel. |
| `hosts` | `{}` | Manual DNS override: map a hostname to a specific IP. |
| `youtube_via_relay` | `false` | Route YouTube (`youtube.com`, `youtu.be`, `youtube-nocookie.com`) through the Apps Script relay instead of the SNI-rewrite path. The SNI-rewrite path uses Google's frontend IP which enforces SafeSearch and can cause **"Video Unavailable"** errors. Setting this to `true` fixes playback at the cost of using more Apps Script executions and slightly higher latency. |
### Optional Dependencies
@@ -280,7 +312,7 @@ Install everything from [`requirements.txt`](requirements.txt). All listed packa
| `h2` | HTTP/2 multiplexing to the Apps Script relay (significantly faster) |
| `brotli` | Decompression of `Content-Encoding: br` responses |
| `zstandard` | Decompression of `Content-Encoding: zstd` responses |
| `netifaces` | Better network interface detection for LAN sharing (fallback available without it) |
### Load Balancing
@@ -316,10 +348,53 @@ python3 main.py -c /path/to/config.json # Use a different config file
python3 main.py --install-cert # Install MITM CA certificate and exit
python3 main.py --uninstall-cert # Remove MITM CA certificate and exit
python3 main.py --no-cert-check # Skip automatic CA install check on startup
python3 main.py --scan # Scan Google IPs and find the fastest one
```
> **Auto-install:** On startup (MITM mode), the proxy automatically checks if the CA certificate is trusted and attempts to install it. Use `--no-cert-check` to skip this. If auto-install fails (e.g. needs elevation), run `python main.py --install-cert` manually or follow Step 6 above.
### Scanning for the Fastest Google IP
If your current `google_ip` in `config.json` is blocked or slow, you can scan to find a faster one:
```bash
python3 main.py --scan
```
This will:
1. Probe 27 candidate Google IPs in parallel
2. Measure latency from your network
3. Display results in a table
4. Recommend the fastest IP
5. Exit with exit code 0 if at least one IP is reachable, 1 otherwise
**Example output:**
```
Scanning 27 Google frontend IPs
SNI: www.google.com
Timeout: 4s per IP
Concurrency: 8 parallel probes
IP LATENCY STATUS
-------------------- ------------ -------------------------
216.239.32.120 42ms OK
216.239.34.120 45ms OK
216.239.36.120 52ms OK
142.250.80.142 timeout timeout
...
Result: 15 / 27 reachable
Top 3 fastest IPs:
1. 216.239.32.120 (42ms)
2. 216.239.34.120 (45ms)
3. 216.239.36.120 (52ms)
Recommended: Set "google_ip": "216.239.32.120" in config.json
```
After scanning, update your `config.json` with the recommended IP and restart the proxy.
---
## Architecture
@@ -356,6 +431,7 @@ MasterHttpRelayVPN/
├── mitm.py # On-the-fly TLS interception
├── cert_installer.py # Cross-platform CA installer (Windows/macOS/Linux + Firefox)
├── codec.py # Content-Encoding decoder (gzip/deflate/br/zstd)
├── google_ip_scanner.py # Scanner to find the fastest reachable Google IP
├── constants.py # Tunable defaults and shared data
└── logging_utils.py # Colored, aligned log formatter
```
+77 -2
View File
@@ -14,6 +14,28 @@
---
### اگر از پروژه راضی‌اید، با دادن ستاره (⭐) در گیت‌هاب از ما حمایت کنید — این کار به دیده‌شدن پروژه کمک می‌کند.
---
### حمایت مالی (اختیاری) 💸
- شبکه TON:
`masterking32.ton`
- آدرس روی شبکه‌های EVM (ETH و سازگارها):
`0x517f07305D6ED781A089322B6cD93d1461bF8652`
- شبکه TRC20 (TRON):
`TLApdY8APWkFHHoxebxGY8JhMeChiETqFH`
از هر نوع حمایت و بازخورد شما سپاسگزاریم — کمک‌ها برای توسعه و بهبود پروژه بسیار ارزشمند است.
---
## سلب مسئولیت
پروژه MasterHttpRelayVPN فقط برای اهداف آموزشی، تست و پژوهش ارائه شده است.
@@ -210,11 +232,21 @@ json
|------|---------------|-------|
| `google_ip` | `216.239.38.120` | IP مورد استفاده برای مسیر Google |
| `front_domain` | `www.google.com` | دامنه‌ای که فیلتر می‌بیند |
| `verify_ssl` | `true` | بررسی اعتبار TLS |
| `verify_ssl` | `true` | بررسی اعتبار TLS فقط برای اتصال fronted محلی به Google/CDN |
| `relay_timeout` | `25` | مهلت کل برای هر درخواست relay قبل از fail شدن |
| `tls_connect_timeout` | `15` | مهلت اتصال TLS پروکسی به endpoint fronted روی Google/CDN |
| `tcp_connect_timeout` | `10` | مهلت اتصال برای tunnel مستقیم و SNI-rewrite |
| `max_response_body_bytes` | `209715200` | سقف نهایی برای اندازه body هر پاسخ relay بعد از buffer/decode |
| `script_ids` | - | چند Deployment ID برای load balancing |
| `chunked_download_extensions` | مطابق [config.example.json](config.example.json) | پسوند فایل‌هایی که باید از دانلود range-parallel استفاده کنند. از `".*"` هم برای probe همه دانلودهای GET پشتیبانی می‌شود. |
| `chunked_download_min_size` | `5242880` | حداقل اندازه کل فایل (۵ مگابایت) برای فعال ماندن دانلود موازی |
| `chunked_download_chunk_size` | `524288` | اندازه هر chunk در دانلود موازی |
| `chunked_download_max_parallel` | `8` | حداکثر تعداد range request همزمان برای یک دانلود |
| `chunked_download_max_chunks` | `256` | سقف نرم برای تعداد کل chunk request ها؛ برای فایل‌های خیلی بزرگ اندازه chunk به‌صورت خودکار بیشتر می‌شود |
| `block_hosts` | `[]` | هاست‌هایی که هرگز نباید tunnel شوند (پاسخ 403). نام دقیق (`ads.example.com`) یا پسوند با نقطه‌ی ابتدایی (`.doubleclick.net`). |
| `bypass_hosts` | `["localhost", ".local", ".lan", ".home.arpa"]` | هاست‌هایی که مستقیم می‌روند (بدون MITM و بدون رله). برای منابع داخلی شبکه یا سایت‌هایی که با MITM مشکل دارند. |
| `direct_google_exclude` | مراجعه به [config.example.json](config.example.json) | اپ‌های Google که باید از مسیر MITM برای رله استفاده کنند به‌جای tunnel مستقیم. |
| `youtube_via_relay` | `false` | مسیردهی YouTube (`youtube.com`، `youtu.be`، `youtube-nocookie.com`) از طریق رله Apps Script به‌جای مسیر SNI-rewrite. مسیر SNI-rewrite از IP فرانت‌اند Google عبور می‌کند که SafeSearch را اجباری می‌کند و می‌تواند باعث خطای **«ویدیو در دسترس نیست»** شود. با فعال کردن این گزینه، پخش ویدیو درست می‌شود اما تعداد اجراهای Apps Script بیشتر و تأخیر اندکی بالاتر می‌رود. |
### وابستگی‌های اختیاری
@@ -226,7 +258,6 @@ json
| `h2` | ارتباط HTTP/2 با رله Apps Script (به‌طور محسوسی سریع‌تر) |
| `brotli` | پشتیبانی از فشرده‌سازی `Content-Encoding: br` |
| `zstandard` | پشتیبانی از فشرده‌سازی `Content-Encoding: zstd` |
| `netifaces` | تشخیص بهتر اینترفیس‌های شبکه برای اشتراک‌گذاری LAN (در صورت نبود آن، حالت جایگزین در دسترس است) |
### استفاده از چند Script ID
@@ -262,10 +293,53 @@ python3 main.py -c /path/to/config.json
python3 main.py --install-cert # نصب گواهی CA و خروج
python3 main.py --uninstall-cert # حذف گراهی CA و خروج
python3 main.py --no-cert-check # رد شدن از بررسی خودکار گواهی
python3 main.py --scan # اسکن IP های Google و یافتن سریع‌ترین
```
> **نصب خودکار:** هنگام اجرا در حالت `apps_script`، برنامه به‌طور خودکار بررسی می‌کند که آیا گواهی CA قابل اعتماد است یا نه و در صورت نیاز آن را نصب می‌کند. اگر نصب خودکار ناموفق بود (مثلاً نیاز به دسترسی مدیر دارد)، می‌توانید دستور `python main.py --install-cert` را اجرا کنید یا مراحل مرحله ۶ را دنبال کنید.
### اسکن کردن برای یافتن سریع‌ترین IP گوگل
اگر `google_ip` فعلی در `config.json` بلاک شده یا آهسته است، می‌توانید اسکن کنید تا سریع‌ترین آن را پیدا کنید:
```bash
python3 main.py --scan
```
این دستور:
1. ۲۷ IP برای fronting Google را به‌صورت موازی بررسی می‌کند
2. تأخیر (latency) از شبکه شما را اندازه می‌گیرد
3. نتایج را در جدول نمایش می‌دهد
4. سریع‌ترین IP را پیشنهاد می‌دهد
5. اگر حداقل یک IP در دسترس باشد کد خروج ۰، ورنه ۱ را برمی‌گرداند
**نمونه خروجی:**
```
Scanning 27 Google frontend IPs
SNI: www.google.com
Timeout: 4s per IP
Concurrency: 8 parallel probes
IP LATENCY STATUS
-------------------- ------------ -------------------------
216.239.32.120 42ms OK
216.239.34.120 45ms OK
216.239.36.120 52ms OK
142.250.80.142 timeout timeout
...
Result: 15 / 27 reachable
Top 3 fastest IPs:
1. 216.239.32.120 (42ms)
2. 216.239.34.120 (45ms)
3. 216.239.36.120 (52ms)
Recommended: Set "google_ip": "216.239.32.120" in config.json
```
پس از اسکن، مقدار `google_ip` در `config.json` را با IP پیشنهادی به‌روزرسانی کنید و پراکسی را دوباره راه‌اندازی کنید.
---
## معماری
@@ -298,6 +372,7 @@ MasterHttpRelayVPN/
├── mitm.py # ساخت و مدیریت گواهی‌ها
├── cert_installer.py # نصب خودکار CA در ویندوز/مک/لینوکس + فایرفاکس
├── codec.py # رمزگشای Content-Encoding (gzip/deflate/br/zstd)
├── google_ip_scanner.py # اسکنر IP های Google برای یافتن سریع‌ترین
├── constants.py # مقادیر پیش‌فرض قابل تنظیم
└── logging_utils.py # فرمت‌دهنده‌ی لاگ رنگی و منظم
```
+42
View File
@@ -11,7 +11,48 @@
"log_level": "INFO",
"verify_ssl": true,
"lan_sharing": true,
"relay_timeout": 25,
"tls_connect_timeout": 15,
"tcp_connect_timeout": 10,
"max_response_body_bytes": 209715200,
"parallel_relay": 1,
"chunked_download_extensions": [
".bin",
".zip",
".tar",
".gz",
".bz2",
".xz",
".7z",
".rar",
".exe",
".msi",
".dmg",
".deb",
".rpm",
".apk",
".iso",
".img",
".mp4",
".mkv",
".avi",
".mov",
".webm",
".mp3",
".flac",
".wav",
".aac",
".pdf",
".doc",
".docx",
".ppt",
".pptx",
".wasm"
],
"chunked_download_min_size": 5242880,
"chunked_download_chunk_size": 524288,
"chunked_download_max_parallel": 8,
"chunked_download_max_chunks": 256,
"block_hosts": [],
"bypass_hosts": [
"localhost",
@@ -42,5 +83,6 @@
"www.google.com",
"safebrowsing.google.com"
],
"youtube_via_relay": false,
"hosts": {}
}
+14 -1
View File
@@ -23,6 +23,7 @@ if _SRC_DIR not in sys.path:
from cert_installer import install_ca, uninstall_ca, is_ca_trusted
from constants import __version__
from lan_utils import log_lan_access
from google_ip_scanner import scan_sync
from logging_utils import configure as configure_logging, print_banner
from mitm import CA_CERT_FILE
from proxy_server import ProxyServer
@@ -97,6 +98,11 @@ def parse_args():
action="store_true",
help="Skip the certificate installation check on startup.",
)
parser.add_argument(
"--scan",
action="store_true",
help="Scan Google IPs to find the fastest reachable one and exit.",
)
return parser.parse_args()
@@ -207,6 +213,13 @@ def main():
_log.info("CA certificate removed successfully.")
else:
_log.warning("CA certificate removal may have failed. Check logs above.")
# ── Google IP Scanner ──────────────────────────────────────────────────
if args.scan:
setup_logging("INFO")
front_domain = config.get("front_domain", "www.google.com")
_log = logging.getLogger("Main")
_log.info(f"Scanning Google IPs (fronting domain: {front_domain})")
ok = scan_sync(front_domain)
sys.exit(0 if ok else 1)
setup_logging(config.get("log_level", "INFO"))
@@ -247,7 +260,7 @@ def main():
log.info("MITM CA is already trusted.")
# ── LAN sharing configuration ────────────────────────────────────────
lan_sharing = config.get("lan_sharing", True)
lan_sharing = config.get("lan_sharing", False)
if lan_sharing:
# If LAN sharing is enabled and host is still localhost, change to all interfaces
if config.get("listen_host", "127.0.0.1") == "127.0.0.1":
+5 -2
View File
@@ -7,11 +7,14 @@ cryptography>=41.0.0
# Optional: HTTP/2 multiplexing (faster apps_script relay)
h2>=4.1.0
# Optional: CA bundle for TLS verification (recommended on macOS / Windows)
certifi>=2024.1.0
# Optional: Brotli decompression (modern websites send `br` encoding)
brotli>=1.1.0
# Optional: Zstandard decompression (some CDNs now serve `zstd`)
zstandard>=0.22.0
# Optional: Better network interface detection for LAN sharing
netifaces>=0.11.0
# LAN interface detection now uses only the Python standard library
# (works on Windows, Linux, macOS, Android/Termux without a C compiler).
+18 -1
View File
@@ -86,6 +86,15 @@ def load_base_config() -> dict:
"socks5_port": 1080,
"log_level": "INFO",
"verify_ssl": True,
"lan_sharing": False,
"relay_timeout": 25,
"tls_connect_timeout": 15,
"tcp_connect_timeout": 10,
"max_response_body_bytes": 200 * 1024 * 1024,
"chunked_download_min_size": 5 * 1024 * 1024,
"chunked_download_chunk_size": 512 * 1024,
"chunked_download_max_parallel": 8,
"chunked_download_max_chunks": 256,
"hosts": {},
}
@@ -118,7 +127,15 @@ def configure_apps_script(cfg: dict) -> dict:
def configure_network(cfg: dict) -> dict:
print()
print(bold("Network settings") + dim(" (press enter to accept defaults)"))
cfg["listen_host"] = prompt("Listen host", default=str(cfg.get("listen_host", "127.0.0.1")))
cfg["lan_sharing"] = prompt_yes_no(
"Enable LAN sharing?",
default=bool(cfg.get("lan_sharing", False)),
)
default_host = str(cfg.get("listen_host", "127.0.0.1"))
if cfg["lan_sharing"] and default_host == "127.0.0.1":
default_host = "0.0.0.0"
cfg["listen_host"] = prompt("Listen host", default=default_host)
port = prompt("HTTP proxy port", default=str(cfg.get("listen_port", 8085)))
try:
+36
View File
@@ -23,6 +23,39 @@ RELAY_TIMEOUT = 25
TLS_CONNECT_TIMEOUT = 15
TCP_CONNECT_TIMEOUT = 10
# ── Google IP Scanner settings ──────────────────────────────────────────────
GOOGLE_SCANNER_TIMEOUT = 4 # Timeout per IP probe (seconds)
GOOGLE_SCANNER_CONCURRENCY = 8 # Parallel probes
# Candidate Google frontend IPs for scanning (multiple ASNs and regions)
CANDIDATE_IPS: tuple[str, ...] = (
"216.239.32.120",
"216.239.34.120",
"216.239.36.120",
"216.239.38.120",
"142.250.80.142",
"142.250.80.138",
"142.250.179.110",
"142.250.185.110",
"142.250.184.206",
"142.250.190.238",
"142.250.191.78",
"172.217.1.206",
"172.217.14.206",
"172.217.16.142",
"172.217.22.174",
"172.217.164.110",
"172.217.168.206",
"172.217.169.206",
"34.107.221.82",
"142.251.32.110",
"142.251.33.110",
"142.251.46.206",
"142.251.46.238",
"142.250.80.170",
"142.250.72.206",
"142.250.64.206",
"142.250.72.110",
)
# ── Response cache ────────────────────────────────────────────────────────
CACHE_MAX_MB = 50
@@ -66,6 +99,8 @@ FRONT_SNI_POOL_GOOGLE: tuple[str, ...] = (
"translate.google.com",
"play.google.com",
"lens.google.com",
"scholar.google.com",
"chromewebstore.google.com",
)
@@ -165,6 +200,7 @@ STATIC_EXTS: tuple[str, ...] = (
".mp3", ".mp4", ".webm", ".wasm", ".avif",
)
LARGE_FILE_EXTS = frozenset({
".bin",
".zip", ".tar", ".gz", ".bz2", ".xz", ".7z", ".rar",
".exe", ".msi", ".dmg", ".deb", ".rpm", ".apk",
".iso", ".img",
+709 -76
View File
File diff suppressed because it is too large Load Diff
+194
View File
@@ -0,0 +1,194 @@
"""
Google IP Scanner — finds the fastest reachable Google frontend IP.
Scans a list of candidate Google IPs via HTTPS (with SNI fronting), measures
latency, and reports results in a formatted table. Useful for finding the best
IP to configure in config.json when your current IP is blocked.
"""
from __future__ import annotations
import asyncio
import logging
import ssl
import time
from dataclasses import dataclass
from typing import Optional
from constants import CANDIDATE_IPS, GOOGLE_SCANNER_TIMEOUT, GOOGLE_SCANNER_CONCURRENCY
log = logging.getLogger("Scanner")
@dataclass
class ProbeResult:
"""Result of a single IP probe."""
ip: str
latency_ms: Optional[int] = None
error: Optional[str] = None
@property
def ok(self) -> bool:
return self.latency_ms is not None
async def _probe_ip(
ip: str,
sni: str,
semaphore: asyncio.Semaphore,
timeout: float,
) -> ProbeResult:
"""
Probe a single IP via HTTPS with SNI fronting.
Args:
ip: The IP to probe (xxx.xxx.xxx.xxx).
sni: The SNI hostname to use in TLS handshake.
semaphore: Rate limiter to control concurrency.
timeout: Timeout in seconds for the entire probe.
Returns:
ProbeResult with latency_ms (if successful) or error message.
"""
async with semaphore:
start_time = time.time()
try:
# Create SSL context that skips certificate verification
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
# Connect to IP:443 with SNI set to the fronting domain
reader, writer = await asyncio.wait_for(
asyncio.open_connection(
ip,
443,
ssl=ctx,
server_hostname=sni,
),
timeout=timeout,
)
# Send minimal HTTP HEAD request
request = f"HEAD / HTTP/1.1\r\nHost: {sni}\r\nConnection: close\r\n\r\n"
writer.write(request.encode())
await writer.drain()
# Read response header (first 256 bytes is plenty for HTTP status)
response = await asyncio.wait_for(reader.read(256), timeout=timeout)
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
# Check if we got an HTTP response
if not response:
return ProbeResult(ip=ip, error="empty response")
response_str = response.decode("utf-8", errors="ignore")
if not response_str.startswith("HTTP/"):
return ProbeResult(ip=ip, error=f"invalid response: {response_str[:30]!r}")
# Success — return latency in milliseconds
elapsed_ms = int((time.time() - start_time) * 1000)
return ProbeResult(ip=ip, latency_ms=elapsed_ms)
except asyncio.TimeoutError:
return ProbeResult(ip=ip, error="timeout")
except ConnectionRefusedError:
return ProbeResult(ip=ip, error="connection refused")
except ConnectionResetError:
return ProbeResult(ip=ip, error="connection reset")
except OSError as e:
return ProbeResult(ip=ip, error=f"network error: {e.strerror or str(e)}")
except Exception as e:
return ProbeResult(ip=ip, error=f"probe failed: {type(e).__name__}")
async def run(front_domain: str) -> bool:
"""
Scan all candidate Google IPs and display results.
Args:
front_domain: The SNI hostname to use (e.g. "www.google.com").
Returns:
True if at least one IP is reachable, False otherwise.
"""
timeout = GOOGLE_SCANNER_TIMEOUT
concurrency = GOOGLE_SCANNER_CONCURRENCY
print()
print(f"Scanning {len(CANDIDATE_IPS)} Google frontend IPs")
print(f" SNI: {front_domain}")
print(f" Timeout: {timeout}s per IP")
print(f" Concurrency: {concurrency} parallel probes")
print()
# Create semaphore to limit concurrency
semaphore = asyncio.Semaphore(concurrency)
# Launch all probes concurrently
tasks = [
_probe_ip(ip, front_domain, semaphore, timeout)
for ip in CANDIDATE_IPS
]
results = await asyncio.gather(*tasks)
# Sort by latency (successful first, then by speed)
results.sort(key=lambda r: (not r.ok, r.latency_ms or float("inf")))
# Display results table
print(f"{'IP':<20} {'LATENCY':<12} {'STATUS':<25}")
print(f"{'-' * 20} {'-' * 12} {'-' * 25}")
ok_count = 0
for result in results:
if result.ok:
print(f"{result.ip:<20} {result.latency_ms:>8}ms OK")
ok_count += 1
else:
status = result.error or "unknown error"
print(f"{result.ip:<20} {'':<12} {status:<25}")
print()
print(f"Result: {ok_count} / {len(results)} reachable")
if ok_count == 0:
print("No Google IPs reachable from this network.")
print()
return False
# Show top 3 fastest
fastest = [r for r in results if r.ok][:3]
print()
print("Top 3 fastest IPs:")
for i, result in enumerate(fastest, 1):
print(f" {i}. {result.ip} ({result.latency_ms}ms)")
print()
print(f"Recommended: Set \"google_ip\": \"{fastest[0].ip}\" in config.json")
print()
return True
def scan_sync(front_domain: str) -> bool:
"""
Wrapper to run async scanner from sync context (e.g. main.py).
Args:
front_domain: The SNI hostname to use.
Returns:
True if at least one IP is reachable, False otherwise.
"""
try:
return asyncio.run(run(front_domain))
except KeyboardInterrupt:
print("\nScan interrupted by user.")
return False
except Exception as e:
log.error(f"Scan failed: {e}")
return False
+53 -17
View File
@@ -20,6 +20,11 @@ import socket
import ssl
from urllib.parse import urlparse
try:
import certifi
except Exception: # optional dependency fallback
certifi = None
import codec
log = logging.getLogger("H2")
@@ -80,6 +85,7 @@ class H2Transport:
self._write_lock = asyncio.Lock()
self._connect_lock = asyncio.Lock()
self._read_task: asyncio.Task | None = None
self._conn_generation = 0
# Per-stream tracking
self._streams: dict[int, _StreamState] = {}
@@ -106,6 +112,13 @@ class H2Transport:
async def _do_connect(self):
"""Establish the HTTP/2 connection with optimized socket settings."""
ctx = ssl.create_default_context()
# Some Python builds don't expose a usable default CA store.
# Load certifi bundle when present to keep TLS verification stable.
if certifi is not None:
try:
ctx.load_verify_locations(cafile=certifi.where())
except Exception:
pass
# Advertise both h2 and http/1.1 — some DPI blocks h2-only ALPN
ctx.set_alpn_protocols(["h2", "http/1.1"])
if not self.verify_ssl:
@@ -127,7 +140,7 @@ class H2Transport:
try:
await asyncio.wait_for(
asyncio.get_event_loop().sock_connect(
asyncio.get_running_loop().sock_connect(
raw, (self.connect_host, 443)
),
timeout=15,
@@ -174,26 +187,34 @@ class H2Transport:
await self._flush()
self._connected = True
self._read_task = asyncio.create_task(self._reader_loop())
self._conn_generation += 1
generation = self._conn_generation
self._read_task = asyncio.create_task(self._reader_loop(generation))
log.info("H2 connected → %s (SNI=%s, TCP_NODELAY=on)",
self.connect_host, sni)
async def reconnect(self):
"""Close current connection and re-establish."""
await self._close_internal()
await self._do_connect()
async with self._connect_lock:
await self._close_internal()
await self._do_connect()
async def _close_internal(self):
self._connected = False
if self._read_task:
self._read_task.cancel()
self._read_task = None
read_task = self._read_task
self._read_task = None
if read_task:
read_task.cancel()
await asyncio.gather(read_task, return_exceptions=True)
if self._writer:
try:
self._writer.close()
writer = self._writer
self._writer = None
writer.close()
await writer.wait_closed()
except Exception:
pass
self._writer = None
self._reader = None
# Wake all pending streams so they can raise
for state in self._streams.values():
state.error = "Connection closed"
@@ -327,7 +348,7 @@ class H2Transport:
# ── Background reader ─────────────────────────────────────────
async def _reader_loop(self):
async def _reader_loop(self, generation: int):
"""Background: read H2 frames, dispatch events to waiting streams."""
try:
while self._connected:
@@ -351,15 +372,30 @@ class H2Transport:
except asyncio.CancelledError:
pass
except ssl.SSLError as e:
# APPLICATION_DATA_AFTER_CLOSE_NOTIFY is raised when the server
# sends data after its TLS close_notify — technically a protocol
# violation but very common with CDNs. It just means the
# connection is closed; reconnect on the next request.
if "APPLICATION_DATA_AFTER_CLOSE_NOTIFY" in str(e):
log.debug("H2 TLS session closed by remote (close_notify): %s", e)
else:
log.error("H2 reader error: %s", e)
except Exception as e:
log.error("H2 reader error: %s", e)
if "application data after close notify" in str(e).lower():
log.debug("H2 reader closed after close_notify: %s", e)
else:
log.error("H2 reader error: %s", e)
finally:
self._connected = False
for state in self._streams.values():
if not state.done.is_set():
state.error = "Connection lost"
state.done.set()
log.info("H2 reader loop ended")
if generation != self._conn_generation:
log.debug("H2 reader loop ended for stale generation %d", generation)
else:
self._connected = False
for state in self._streams.values():
if not state.done.is_set():
state.error = "Connection lost"
state.done.set()
log.info("H2 reader loop ended")
def _dispatch(self, event):
"""Route a single h2 event to its stream."""
+82 -79
View File
@@ -1,101 +1,103 @@
"""
LAN utilities for detecting network interfaces and IP addresses.
LAN utilities for detecting network interfaces and IPv4 addresses.
Provides functionality to enumerate local network interfaces and their
associated IP addresses for LAN proxy sharing.
Provides functionality to enumerate local IPv4 addresses for LAN proxy
sharing. IPv6 is intentionally not reported — this project only exposes
the proxy over IPv4 LANs, which is what every consumer router and
phone/desktop client actually uses.
Implementation notes
--------------------
This module relies only on the Python standard library so it works
out-of-the-box on every supported OS (Windows, Linux, macOS,
Android/Termux, *BSD) without requiring a C compiler or native build
tools (previous versions depended on ``netifaces``, which needs
"Microsoft Visual C++ 14.0 or greater" on Windows and was a frequent
install blocker for users on slow connections).
Strategy (in order):
1. "UDP connect trick" to reliably discover the primary outbound
IPv4 address on any OS.
2. ``socket.getaddrinfo(hostname, AF_INET)`` to enumerate any additional
IPv4 addresses bound to the host (covers multi-homed machines).
"""
import ipaddress
import logging
import socket
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Set
log = logging.getLogger("LAN")
# ---------------------------------------------------------------------------
# Primary-IP discovery (UDP connect trick)
# ---------------------------------------------------------------------------
def _primary_ipv4() -> Optional[str]:
"""
Return the primary local IPv4 the OS would use for outbound traffic.
Uses a connected UDP socket which does *not* actually send packets —
the kernel just picks the source address from its routing table.
Works identically on Windows, Linux, macOS, and Android.
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.settimeout(0.5)
# TEST-NET-1 address, port is arbitrary; no packet is sent for UDP connect().
s.connect(('192.0.2.1', 80))
return s.getsockname()[0]
except OSError:
return None
finally:
s.close()
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def get_network_interfaces() -> Dict[str, List[str]]:
"""
Get all network interfaces and their associated IP addresses.
Returns a dictionary mapping interface names to lists of IP addresses
(both IPv4 and IPv6). Only includes interfaces with valid IP addresses
that are not loopback.
Get network interfaces and their associated non-loopback IPv4 addresses.
Returns:
Dict[str, List[str]]: Interface name -> list of IP addresses
Dict[str, List[str]]: Interface label -> list of IPv4 addresses.
Labels are best-effort synthetic names such as ``"primary"``
and ``"host"``.
"""
interfaces = {}
interfaces: Dict[str, List[str]] = {}
seen_ips: Set[str] = set()
def _add(label: str, ip: Optional[str]) -> None:
if not ip or ip in seen_ips:
return
if ip.startswith('127.'):
return
seen_ips.add(ip)
interfaces.setdefault(label, []).append(ip)
# 1) Primary outbound IPv4 (most reliable, cross-platform).
_add('primary', _primary_ipv4())
# 2) Enumerate via hostname resolution (picks up multi-homed hosts).
try:
import netifaces
for iface in netifaces.interfaces():
addrs = netifaces.ifaddresses(iface)
ips = []
# IPv4 addresses
if netifaces.AF_INET in addrs:
for addr in addrs[netifaces.AF_INET]:
ip = addr.get('addr')
if ip and not ip.startswith('127.'):
ips.append(ip)
# IPv6 addresses (without scope)
if netifaces.AF_INET6 in addrs:
for addr in addrs[netifaces.AF_INET6]:
ip = addr.get('addr')
if ip and not ip.startswith('::1') and not '%' in ip:
# Remove scope if present
ips.append(ip.split('%')[0])
if ips:
interfaces[iface] = ips
except ImportError:
# Fallback to socket method for basic detection
log.debug("netifaces not available, using socket fallback")
interfaces = _get_interfaces_socket_fallback()
return interfaces
def _get_interfaces_socket_fallback() -> Dict[str, List[str]]:
"""
Fallback method to get network interfaces using socket.
This is less comprehensive than netifaces but works without extra dependencies.
"""
interfaces = {}
try:
# Get hostname and try to resolve to IPs
hostname = socket.gethostname()
try:
# Get IPv4 addresses
ipv4_info = socket.getaddrinfo(hostname, None, socket.AF_INET)
ipv4_addrs = [info[4][0] for info in ipv4_info if not info[4][0].startswith('127.')]
if ipv4_addrs:
interfaces['primary'] = list(set(ipv4_addrs)) # Remove duplicates
except socket.gaierror:
pass
except OSError:
hostname = ''
if hostname:
try:
# Get IPv6 addresses
ipv6_info = socket.getaddrinfo(hostname, None, socket.AF_INET6)
ipv6_addrs = []
for info in ipv6_info:
ip = info[4][0]
if not ip.startswith('::1') and not '%' in ip:
ipv6_addrs.append(ip.split('%')[0])
if ipv6_addrs:
interfaces['primary_ipv6'] = list(set(ipv6_addrs))
except socket.gaierror:
for info in socket.getaddrinfo(hostname, None, socket.AF_INET):
_add('host', info[4][0])
except (socket.gaierror, OSError):
pass
except Exception as e:
log.debug("Socket fallback failed: %s", e)
return interfaces
def get_lan_ips(port: int = 8085) -> List[str]:
"""
Get list of LAN-accessible proxy addresses.
Get list of LAN-accessible proxy addresses (IPv4 only).
Returns a list of IP:port combinations that can be used to access
the proxy from other devices on the local network.
@@ -107,21 +109,22 @@ def get_lan_ips(port: int = 8085) -> List[str]:
List[str]: List of "IP:port" strings for LAN access
"""
interfaces = get_network_interfaces()
lan_addresses = []
lan_addresses: List[str] = []
for iface_ips in interfaces.values():
for ip in iface_ips:
try:
# Validate IP and check if it's a private address
addr = ipaddress.ip_address(ip)
if addr.is_private or addr.is_link_local:
lan_addresses.append(f"{ip}:{port}")
except ValueError:
addr = ipaddress.IPv4Address(ip)
except (ValueError, ipaddress.AddressValueError):
continue
if addr.is_loopback or addr.is_unspecified:
continue
if addr.is_private or addr.is_link_local:
lan_addresses.append(f"{ip}:{port}")
# Remove duplicates while preserving order
seen = set()
unique_addresses = []
# Remove duplicates while preserving order.
seen: Set[str] = set()
unique_addresses: List[str] = []
for addr in lan_addresses:
if addr not in seen:
seen.add(addr)
+343 -88
View File
@@ -15,6 +15,11 @@ import time
import ipaddress
from urllib.parse import urlparse
try:
import certifi
except Exception: # optional dependency fallback
certifi = None
from constants import (
CACHE_MAX_MB,
CACHE_TTL_MAX,
@@ -65,6 +70,23 @@ def _parse_content_length(header_block: bytes) -> int:
return 0
def _has_unsupported_transfer_encoding(header_block: bytes) -> bool:
"""True when the request uses Transfer-Encoding, which we don't stream."""
for raw_line in header_block.split(b"\r\n"):
name, sep, value = raw_line.partition(b":")
if not sep:
continue
if name.strip().lower() != b"transfer-encoding":
continue
encodings = [
token.strip().lower()
for token in value.decode(errors="replace").split(",")
if token.strip()
]
return any(token != "identity" for token in encodings)
return False
class ResponseCache:
"""Simple LRU response cache — avoids repeated relay calls."""
@@ -147,6 +169,14 @@ class ProxyServer:
_GOOGLE_DIRECT_ALLOW_EXACT = GOOGLE_DIRECT_ALLOW_EXACT
_GOOGLE_DIRECT_ALLOW_SUFFIXES = GOOGLE_DIRECT_ALLOW_SUFFIXES
_TRACE_HOST_SUFFIXES = TRACE_HOST_SUFFIXES
_DOWNLOAD_DEFAULT_EXTS = tuple(sorted(LARGE_FILE_EXTS))
_DOWNLOAD_ACCEPT_MARKERS = (
"application/octet-stream",
"application/zip",
"application/x-bittorrent",
"video/",
"audio/",
)
def __init__(self, config: dict):
self.host = config.get("listen_host", "127.0.0.1")
@@ -154,11 +184,42 @@ class ProxyServer:
self.socks_enabled = config.get("socks5_enabled", True)
self.socks_host = config.get("socks5_host", self.host)
self.socks_port = config.get("socks5_port", 1080)
if self.socks_enabled and self.socks_host == self.host \
and int(self.socks_port) == int(self.port):
raise ValueError(
f"listen_port and socks5_port must differ on the same host "
f"(both set to {self.port} on {self.host}). "
f"Change one of them in config.json."
)
self.fronter = DomainFronter(config)
self.mitm = None
self._cache = ResponseCache(max_mb=CACHE_MAX_MB)
self._direct_fail_until: dict[str, float] = {}
self._servers: list[asyncio.base_events.Server] = []
self._client_tasks: set[asyncio.Task] = set()
self._tcp_connect_timeout = self._cfg_float(
config, "tcp_connect_timeout", TCP_CONNECT_TIMEOUT, minimum=1.0,
)
self._download_min_size = self._cfg_int(
config, "chunked_download_min_size", 5 * 1024 * 1024, minimum=0,
)
self._download_chunk_size = self._cfg_int(
config, "chunked_download_chunk_size", 512 * 1024, minimum=64 * 1024,
)
self._download_max_parallel = self._cfg_int(
config, "chunked_download_max_parallel", 8, minimum=1,
)
self._download_max_chunks = self._cfg_int(
config, "chunked_download_max_chunks", 256, minimum=1,
)
self._download_extensions, self._download_any_extension = (
self._normalize_download_extensions(
config.get(
"chunked_download_extensions",
list(self._DOWNLOAD_DEFAULT_EXTS),
)
)
)
# hosts override — DNS fake-map: domain/suffix → IP
# Checked before any real DNS lookup; supports exact and suffix matching.
@@ -188,6 +249,17 @@ class ProxyServer:
self._block_hosts = self._load_host_rules(config.get("block_hosts", []))
self._bypass_hosts = self._load_host_rules(config.get("bypass_hosts", []))
# Route YouTube through the relay when requested; the Google frontend
# IP can enforce SafeSearch on the SNI-rewrite path.
if config.get("youtube_via_relay", False):
self._SNI_REWRITE_SUFFIXES = tuple(
s for s in SNI_REWRITE_SUFFIXES
if s not in self._YOUTUBE_SNI_SUFFIXES
)
log.info("youtube_via_relay enabled — YouTube routed through relay")
else:
self._SNI_REWRITE_SUFFIXES = SNI_REWRITE_SUFFIXES
try:
from mitm import MITMCertManager
self.mitm = MITMCertManager()
@@ -198,6 +270,55 @@ class ProxyServer:
# ── Host-policy helpers ───────────────────────────────────────
@staticmethod
def _cfg_int(config: dict, key: str, default: int, *, minimum: int = 1) -> int:
try:
value = int(config.get(key, default))
except (TypeError, ValueError):
value = default
return max(minimum, value)
@staticmethod
def _cfg_float(config: dict, key: str, default: float,
*, minimum: float = 0.1) -> float:
try:
value = float(config.get(key, default))
except (TypeError, ValueError):
value = default
return max(minimum, value)
@classmethod
def _normalize_download_extensions(cls, raw) -> tuple[tuple[str, ...], bool]:
values = raw if isinstance(raw, (list, tuple)) else cls._DOWNLOAD_DEFAULT_EXTS
normalized: list[str] = []
any_extension = False
seen: set[str] = set()
for item in values:
ext = str(item).strip().lower()
if not ext:
continue
if ext in {"*", ".*"}:
any_extension = True
continue
if not ext.startswith("."):
ext = "." + ext
if ext not in seen:
seen.add(ext)
normalized.append(ext)
if not normalized and not any_extension:
normalized = list(cls._DOWNLOAD_DEFAULT_EXTS)
return tuple(normalized), any_extension
def _track_current_task(self) -> asyncio.Task | None:
task = asyncio.current_task()
if task is not None:
self._client_tasks.add(task)
return task
def _untrack_task(self, task: asyncio.Task | None) -> None:
if task is not None:
self._client_tasks.discard(task)
@staticmethod
def _load_host_rules(raw) -> tuple[set[str], tuple[str, ...]]:
"""Accept a list of host strings; return (exact_set, suffix_tuple).
@@ -352,15 +473,18 @@ class ProxyServer:
self.socks_host, self.socks_port,
)
async with http_srv:
if socks_srv:
async with socks_srv:
await asyncio.gather(
http_srv.serve_forever(),
socks_srv.serve_forever(),
)
else:
await http_srv.serve_forever()
try:
async with http_srv:
if socks_srv:
async with socks_srv:
await asyncio.gather(
http_srv.serve_forever(),
socks_srv.serve_forever(),
)
else:
await http_srv.serve_forever()
except asyncio.CancelledError:
raise
async def stop(self):
"""Shut down all listeners and release relay resources."""
@@ -375,6 +499,15 @@ class ProxyServer:
except Exception:
pass
self._servers = []
current = asyncio.current_task()
client_tasks = [task for task in self._client_tasks if task is not current]
for task in client_tasks:
task.cancel()
if client_tasks:
await asyncio.gather(*client_tasks, return_exceptions=True)
self._client_tasks.clear()
try:
await self.fronter.close()
except Exception as exc:
@@ -384,6 +517,7 @@ class ProxyServer:
async def _on_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
addr = writer.get_extra_info("peername")
task = self._track_current_task()
try:
first_line = await asyncio.wait_for(reader.readline(), timeout=30)
if not first_line:
@@ -400,6 +534,16 @@ class ProxyServer:
if line in (b"\r\n", b"\n", b""):
break
if _has_unsupported_transfer_encoding(header_block):
log.warning("Unsupported Transfer-Encoding on client request")
writer.write(
b"HTTP/1.1 501 Not Implemented\r\n"
b"Connection: close\r\n"
b"Content-Length: 0\r\n\r\n"
)
await writer.drain()
return
request_line = first_line.decode(errors="replace").strip()
parts = request_line.split(" ", 2)
if len(parts) < 2:
@@ -412,11 +556,14 @@ class ProxyServer:
else:
await self._do_http(header_block, reader, writer)
except asyncio.CancelledError:
pass
except asyncio.TimeoutError:
log.debug("Timeout: %s", addr)
except Exception as e:
log.error("Error (%s): %s", addr, e)
finally:
self._untrack_task(task)
try:
writer.close()
await writer.wait_closed()
@@ -426,6 +573,7 @@ class ProxyServer:
async def _on_socks_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
addr = writer.get_extra_info("peername")
task = self._track_current_task()
try:
header = await asyncio.wait_for(reader.readexactly(2), timeout=15)
ver, nmethods = header[0], header[1]
@@ -475,11 +623,14 @@ class ProxyServer:
except asyncio.IncompleteReadError:
pass
except asyncio.CancelledError:
pass
except asyncio.TimeoutError:
log.debug("SOCKS5 timeout: %s", addr)
except Exception as e:
log.error("SOCKS5 error (%s): %s", addr, e)
finally:
self._untrack_task(task)
try:
writer.close()
await writer.wait_closed()
@@ -533,12 +684,17 @@ class ProxyServer:
# • port 443 → MITM + relay through Apps Script
# • port 80 → plain-HTTP relay through Apps Script
# • other → give up (non-HTTP; can't be relayed)
# We use a shorter connect timeout for IP literals (4 s) because
# when the route is DPI-dropped, waiting longer doesn't help and
# clients like Telegram speed up DC-rotation when we fail fast.
# We remember per-IP failures for a short while so subsequent
# connects skip the doomed direct attempt.
if _is_ip_literal(host):
if not self._direct_temporarily_disabled(host):
log.info("Direct tunnel → %s:%d (IP literal)", host, port)
ok = await self._do_direct_tunnel(host, port, reader, writer)
ok = await self._do_direct_tunnel(
host, port, reader, writer, timeout=4.0,
)
if ok:
return
self._remember_direct_failure(host, ttl=300)
@@ -606,6 +762,11 @@ class ProxyServer:
# Built-in list of domains that must be reached via Google's frontend IP
# with SNI rewritten to `front_domain` (default: www.google.com).
# Source: constants.SNI_REWRITE_SUFFIXES.
# When youtube_via_relay is enabled the YouTube suffixes are removed so
# YouTube goes through the Apps Script relay instead.
_YOUTUBE_SNI_SUFFIXES = frozenset({
"youtube.com", "youtu.be", "youtube-nocookie.com",
})
_SNI_REWRITE_SUFFIXES = SNI_REWRITE_SUFFIXES
def _sni_rewrite_ip(self, host: str) -> str | None:
@@ -724,14 +885,21 @@ class ProxyServer:
errors: list[str] = []
loop = asyncio.get_running_loop()
# Strip IPv6 brackets (CONNECT may deliver "[::1]" as the hostname).
# ipaddress.ip_address() rejects the bracketed form, which would
# otherwise force a DNS lookup for an IP literal and fail.
lookup_target = target.strip()
if lookup_target.startswith("[") and lookup_target.endswith("]"):
lookup_target = lookup_target[1:-1]
try:
ipaddress.ip_address(target)
candidates = [(0, target)]
ipaddress.ip_address(lookup_target)
candidates = [(0, lookup_target)]
except ValueError:
try:
infos = await asyncio.wait_for(
loop.getaddrinfo(
target,
lookup_target,
port,
family=socket.AF_UNSPEC,
type=socket.SOCK_STREAM,
@@ -739,7 +907,7 @@ class ProxyServer:
timeout=timeout,
)
except Exception as exc:
raise OSError(f"dns lookup failed for {target}: {exc!r}") from exc
raise OSError(f"dns lookup failed for {lookup_target}: {exc!r}") from exc
candidates = []
seen = set()
@@ -772,7 +940,8 @@ class ProxyServer:
async def _do_direct_tunnel(self, host: str, port: int,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
connect_ip: str | None = None):
connect_ip: str | None = None,
timeout: float | None = None):
"""Pipe raw TLS bytes directly to the target server.
connect_ip overrides DNS: the TCP connection goes to that IP
@@ -782,8 +951,13 @@ class ProxyServer:
normal edge instead of being forced onto the fronting IP.
"""
target_ip = connect_ip or host
effective_timeout = (
self._tcp_connect_timeout if timeout is None else float(timeout)
)
try:
r_remote, w_remote = await self._open_tcp_connection(target_ip, port, timeout=10)
r_remote, w_remote = await self._open_tcp_connection(
target_ip, port, timeout=effective_timeout,
)
except Exception as e:
log.error("Direct tunnel connect failed (%s via %s): %s",
host, target_ip, e)
@@ -834,7 +1008,7 @@ class ProxyServer:
# Step 1: MITM — accept TLS from the browser
ssl_ctx_server = self.mitm.get_server_context(host)
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
transport = writer.transport
protocol = transport.get_protocol()
try:
@@ -848,6 +1022,11 @@ class ProxyServer:
# Step 2: open outgoing TLS to target IP with the safe SNI
ssl_ctx_client = ssl.create_default_context()
if certifi is not None:
try:
ssl_ctx_client.load_verify_locations(cafile=certifi.where())
except Exception:
pass
if not self.fronter.verify_ssl:
ssl_ctx_client.check_hostname = False
ssl_ctx_client.verify_mode = ssl.CERT_NONE
@@ -858,7 +1037,7 @@ class ProxyServer:
ssl=ssl_ctx_client,
server_hostname=sni_out,
),
timeout=10,
timeout=self._tcp_connect_timeout,
)
except Exception as e:
log.error("SNI-rewrite outbound connect failed (%s via %s): %s",
@@ -901,7 +1080,7 @@ class ProxyServer:
ssl_ctx = self.mitm.get_server_context(host)
# Upgrade the existing connection to TLS (we are the server)
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
transport = writer.transport
protocol = transport.get_protocol()
@@ -914,17 +1093,15 @@ class ProxyServer:
# • Telegram Desktop / MTProto over port 443 sends obfuscated
# non-TLS bytes — we literally cannot decrypt these, and
# since the target IP is blocked we can't direct-tunnel
# either. The only workaround is to configure Telegram as
# an HTTP proxy (not SOCKS5), so it sends hostnames our
# SNI-rewrite path can handle.
# either. Telegram will rotate to another DC on its own;
# failing fast here lets that happen sooner.
# • Client CONNECTs but never speaks TLS (some probes).
if _is_ip_literal(host) and port == 443:
log.warning(
"MITM TLS handshake failed for %s:%d (%s). "
"Likely non-TLS traffic (e.g. Telegram MTProto over "
"SOCKS5). Cannot relay raw TCP to a blocked IP — "
"use the HTTP proxy instead so hostnames are preserved.",
host, port, e,
log.info(
"Non-TLS traffic on %s:%d (likely Telegram MTProto / "
"obfuscated protocol). This DC appears blocked; the "
"client should rotate to another endpoint shortly.",
host, port,
)
elif port != 443:
log.debug(
@@ -959,16 +1136,47 @@ class ProxyServer:
break
header_block = first_line
oversized_headers = False
while True:
line = await asyncio.wait_for(reader.readline(), timeout=10)
header_block += line
if len(header_block) > MAX_HEADER_BYTES:
oversized_headers = True
break
if line in (b"\r\n", b"\n", b""):
break
# Reject truncated / oversized header blocks cleanly rather
# than forwarding a half-parsed request to the relay — doing
# so would send malformed JSON payloads to Apps Script and
# leave the client hanging until its own timeout fires.
if oversized_headers:
log.warning(
"MITM header block exceeds %d bytes — closing (%s)",
MAX_HEADER_BYTES, host,
)
try:
writer.write(
b"HTTP/1.1 431 Request Header Fields Too Large\r\n"
b"Connection: close\r\n"
b"Content-Length: 0\r\n\r\n"
)
await writer.drain()
except Exception:
pass
break
# Read body
body = b""
if _has_unsupported_transfer_encoding(header_block):
log.warning("Unsupported Transfer-Encoding → %s:%d", host, port)
writer.write(
b"HTTP/1.1 501 Not Implemented\r\n"
b"Connection: close\r\n"
b"Content-Length: 0\r\n\r\n"
)
await writer.drain()
break
length = _parse_content_length(header_block)
if length > MAX_REQUEST_BODY_BYTES:
raise ValueError(f"Request body too large: {length} bytes")
@@ -990,11 +1198,11 @@ class ProxyServer:
if b":" in raw_line:
k, v = raw_line.decode(errors="replace").split(":", 1)
headers[k.strip()] = v.strip()
# Shortening the length of X API URLs to prevent relay errors.
if host == "x.com" and re.match(r"/i/api/graphql/[^/]+/[^?]+\?variables=", path):
path = path.split("&")[0]
# MITM traffic arrives as origin-form paths; SOCKS/plain HTTP can
# also send absolute-form requests. Normalize both to full URLs.
if path.startswith("http://") or path.startswith("https://"):
@@ -1008,27 +1216,33 @@ class ProxyServer:
log.info("MITM → %s %s", method, url)
# ── CORS: extract relevant request headers ────────────────────
origin = next(
(v for k, v in headers.items() if k.lower() == "origin"), ""
# ── CORS: extract relevant request headers ─────────────
origin = self._header_value(headers, "origin")
acr_method = self._header_value(
headers, "access-control-request-method",
)
acr_method = next(
(v for k, v in headers.items()
if k.lower() == "access-control-request-method"), ""
)
acr_headers = next(
(v for k, v in headers.items()
if k.lower() == "access-control-request-headers"), ""
acr_headers = self._header_value(
headers, "access-control-request-headers",
)
# CORS preflight — respond directly; UrlFetchApp doesn't
# support OPTIONS so forwarding it would always fail.
# CORS preflight — respond directly. Apps Script's
# UrlFetchApp does not support the OPTIONS method, so
# forwarding preflights would always fail and break every
# cross-origin fetch/XHR the browser runs through us.
if method.upper() == "OPTIONS" and acr_method:
log.debug("CORS preflight → %s (responding locally)", url[:60])
writer.write(self._cors_preflight_response(origin, acr_method, acr_headers))
log.debug(
"CORS preflight → %s (responding locally)",
url[:60],
)
writer.write(self._cors_preflight_response(
origin, acr_method, acr_headers,
))
await writer.drain()
continue
if await self._maybe_stream_download(method, url, headers, body, writer):
continue
# Check local cache first (GET only)
response = None
if self._cache_allowed(method, url, headers, body):
@@ -1057,8 +1271,10 @@ class ProxyServer:
self._cache.put(url, response, ttl)
log.debug("Cached (%ds): %s", ttl, url[:60])
# Inject permissive CORS headers whenever the browser
# sent an Origin (cross-origin XHR / fetch).
# Inject permissive CORS headers whenever the browser sent
# an Origin (cross-origin XHR / fetch). Without this, the
# browser blocks the response even though the relay fetched
# it successfully.
if origin and response:
response = self._inject_cors_headers(response, origin)
@@ -1077,11 +1293,16 @@ class ProxyServer:
log.error("MITM handler error (%s): %s", host, e)
break
# ── CORS helpers ──────────────────────────────────────────────────────────
# ── CORS helpers ──────────────────────────────────────────────
@staticmethod
def _cors_preflight_response(origin: str, acr_method: str, acr_headers: str) -> bytes:
"""Return a 204 No Content response that satisfies a CORS preflight."""
def _cors_preflight_response(origin: str, acr_method: str,
acr_headers: str) -> bytes:
"""Build a 204 response that satisfies a CORS preflight locally.
Apps Script's UrlFetchApp does not support OPTIONS, so we have to
answer preflights here instead of forwarding them.
"""
allow_origin = origin or "*"
allow_methods = (
f"{acr_method}, GET, POST, PUT, DELETE, PATCH, OPTIONS"
@@ -1103,37 +1324,29 @@ class ProxyServer:
@staticmethod
def _inject_cors_headers(response: bytes, origin: str) -> bytes:
"""Inject CORS headers only if the upstream response lacks them.
"""Strip existing Access-Control-* headers and add permissive ones.
We must NOT overwrite the origin server's CORS headers: sites like
x.com return carefully-scoped Access-Control-Allow-Headers that list
specific custom headers (e.g. x-csrf-token). Replacing them with
wildcards together with Allow-Credentials: true makes browsers
reject the response (per the Fetch spec, "*" is literal when
credentials are included), which the site then blames on privacy
extensions. So we only fill in what the server omitted.
Keeps the body untouched; only rewrites the header block. Using
the exact browser-supplied Origin (rather than "*") is required
when the request is credentialed (cookies, Authorization).
"""
sep = b"\r\n\r\n"
if sep not in response:
return response
header_section, body = response.split(sep, 1)
lines = header_section.decode(errors="replace").split("\r\n")
existing = {ln.split(":", 1)[0].strip().lower()
for ln in lines if ":" in ln}
# If the upstream already handled CORS, leave it completely alone.
if "access-control-allow-origin" in existing:
return response
# Otherwise inject a minimal, credential-safe set (no wildcards,
# since wildcards combined with credentials are invalid).
lines = [ln for ln in lines
if not ln.lower().startswith("access-control-")]
allow_origin = origin or "*"
additions = [f"Access-Control-Allow-Origin: {allow_origin}"]
if allow_origin != "*":
additions.append("Access-Control-Allow-Credentials: true")
additions.append("Vary: Origin")
return ("\r\n".join(lines + additions) + "\r\n\r\n").encode() + body
lines += [
f"Access-Control-Allow-Origin: {allow_origin}",
"Access-Control-Allow-Credentials: true",
"Access-Control-Allow-Methods: GET, POST, PUT, DELETE, PATCH, OPTIONS",
"Access-Control-Allow-Headers: *",
"Access-Control-Expose-Headers: *",
"Vary: Origin",
]
return ("\r\n".join(lines) + "\r\n\r\n").encode() + body
async def _relay_smart(self, method, url, headers, body):
"""Choose optimal relay strategy based on request type.
@@ -1156,22 +1369,67 @@ class ProxyServer:
# Only probe with Range when the URL looks like a big file.
if self._is_likely_download(url, headers):
return await self.fronter.relay_parallel(
method, url, headers, body
method,
url,
headers,
body,
chunk_size=self._download_chunk_size,
max_parallel=self._download_max_parallel,
max_chunks=self._download_max_chunks,
min_size=self._download_min_size,
)
return await self.fronter.relay(method, url, headers, body)
def _is_likely_download(self, url: str, headers: dict) -> bool:
"""Heuristic: is this URL likely a large file download?"""
path = url.split("?")[0].lower()
for ext in LARGE_FILE_EXTS:
if self._download_any_extension:
return True
for ext in self._download_extensions:
if path.endswith(ext):
return True
accept = self._header_value(headers, "accept").lower()
if any(marker in accept for marker in self._DOWNLOAD_ACCEPT_MARKERS):
return True
return False
async def _maybe_stream_download(self, method: str, url: str,
headers: dict | None, body: bytes,
writer) -> bool:
if method.upper() != "GET" or body:
return False
if headers:
for key in headers:
if key.lower() == "range":
return False
effective_headers = headers or {}
if not self._is_likely_download(url, effective_headers):
return False
if not self.fronter.stream_download_allowed(url):
return False
return await self.fronter.stream_parallel_download(
url,
effective_headers,
writer,
chunk_size=self._download_chunk_size,
max_parallel=self._download_max_parallel,
max_chunks=self._download_max_chunks,
min_size=self._download_min_size,
)
# ── Plain HTTP forwarding ─────────────────────────────────────
async def _do_http(self, header_block: bytes, reader, writer):
body = b""
if _has_unsupported_transfer_encoding(header_block):
log.warning("Unsupported Transfer-Encoding on plain HTTP request")
writer.write(
b"HTTP/1.1 501 Not Implemented\r\n"
b"Connection: close\r\n"
b"Content-Length: 0\r\n\r\n"
)
await writer.drain()
return
length = _parse_content_length(header_block)
if length > MAX_REQUEST_BODY_BYTES:
writer.write(b"HTTP/1.1 413 Content Too Large\r\n\r\n")
@@ -1194,24 +1452,21 @@ class ProxyServer:
k, v = raw_line.decode(errors="replace").split(":", 1)
headers[k.strip()] = v.strip()
# ── CORS preflight over plain HTTP ────────────────────────────
origin = next(
(v for k, v in headers.items() if k.lower() == "origin"), ""
)
acr_method = next(
(v for k, v in headers.items()
if k.lower() == "access-control-request-method"), ""
)
acr_headers_val = next(
(v for k, v in headers.items()
if k.lower() == "access-control-request-headers"), ""
)
# ── CORS preflight over plain HTTP ────────────────────────────
origin = self._header_value(headers, "origin")
acr_method = self._header_value(headers, "access-control-request-method")
acr_headers = self._header_value(headers, "access-control-request-headers")
if method.upper() == "OPTIONS" and acr_method:
log.debug("CORS preflight (HTTP) → %s (responding locally)", url[:60])
writer.write(self._cors_preflight_response(origin, acr_method, acr_headers_val))
writer.write(self._cors_preflight_response(
origin, acr_method, acr_headers,
))
await writer.drain()
return
if await self._maybe_stream_download(method, url, headers, body, writer):
return
# Cache check for GET
response = None
if self._cache_allowed(method, url, headers, body):
@@ -1227,9 +1482,9 @@ class ProxyServer:
if ttl > 0:
self._cache.put(url, response, ttl)
# Inject CORS headers for cross-origin requests
if origin and response:
response = self._inject_cors_headers(response, origin)
self._log_response_summary(url, response)
writer.write(response)