""" Local HTTP proxy server. Intercepts the user's browser traffic and forwards everything through the Apps Script relay (MITM-decrypts HTTPS locally, forwards requests as JSON to script.google.com fronted through www.google.com). """ import asyncio import logging import re import socket import ssl import time import ipaddress try: import certifi except Exception: # optional dependency fallback certifi = None from core.constants import ( CACHE_MAX_MB, CLIENT_IDLE_TIMEOUT, DEFAULT_BYPASS_HOSTS, GOOGLE_DIRECT_ALLOW_EXACT, GOOGLE_DIRECT_ALLOW_SUFFIXES, GOOGLE_DIRECT_EXACT_EXCLUDE, GOOGLE_DIRECT_SUFFIX_EXCLUDE, GOOGLE_OWNED_EXACT, GOOGLE_OWNED_SUFFIXES, LARGE_FILE_EXTS, MAX_HEADER_BYTES, MAX_REQUEST_BODY_BYTES, RELAY_URL_PATTERNS, SNI_REWRITE_SUFFIXES, TCP_CONNECT_TIMEOUT, TRACE_HOST_SUFFIXES, UNCACHEABLE_HEADER_NAMES, ) from relay.domain_fronter import DomainFronter from .socks5 import negotiate_socks5 from .proxy_support import ( ResponseCache, cors_preflight_response, has_unsupported_transfer_encoding, header_value, host_matches_rules, inject_cors_headers, is_ip_literal, load_host_rules, log_response_summary, parse_content_length, ) from relay.relay_response import split_raw_response log = logging.getLogger("Proxy") class ProxyServer: # Pulled from constants.py so users can override any subset via config. _GOOGLE_DIRECT_EXACT_EXCLUDE = GOOGLE_DIRECT_EXACT_EXCLUDE _GOOGLE_DIRECT_SUFFIX_EXCLUDE = GOOGLE_DIRECT_SUFFIX_EXCLUDE _GOOGLE_DIRECT_ALLOW_EXACT = GOOGLE_DIRECT_ALLOW_EXACT _GOOGLE_DIRECT_ALLOW_SUFFIXES = GOOGLE_DIRECT_ALLOW_SUFFIXES _TRACE_HOST_SUFFIXES = TRACE_HOST_SUFFIXES _DOWNLOAD_DEFAULT_EXTS = tuple(sorted(LARGE_FILE_EXTS)) _DOWNLOAD_ACCEPT_MARKERS = ( "application/octet-stream", "application/zip", "application/x-bittorrent", "video/", "audio/", ) def __init__(self, config: dict): self.host = config.get("listen_host", "127.0.0.1") # Prefer the new key (http_port) but keep listen_port for old configs. self.port = config.get("http_port", config.get("listen_port", 8080)) self.socks_enabled = True self.socks_host = config.get("socks5_host", self.host) self.socks_port = config.get("socks5_port", 1080) if self.socks_enabled and self.socks_host == self.host \ and int(self.socks_port) == int(self.port): raise ValueError( f"http_port and socks5_port must differ on the same host " f"(both set to {self.port} on {self.host}). " f"Change one of them in config.json." ) self.fronter = DomainFronter(config) self.mitm = None self._cache = ResponseCache(max_mb=CACHE_MAX_MB) self._direct_fail_until: dict[str, float] = {} self._servers: list[asyncio.base_events.Server] = [] self._client_tasks: set[asyncio.Task] = set() self._tcp_connect_timeout = self._cfg_float( config, "tcp_connect_timeout", TCP_CONNECT_TIMEOUT, minimum=1.0, ) self._download_min_size = self._cfg_int( config, "chunked_download_min_size", 5 * 1024 * 1024, minimum=0, ) self._download_chunk_size = self._cfg_int( config, "chunked_download_chunk_size", 512 * 1024, minimum=64 * 1024, ) self._download_max_parallel = self._cfg_int( config, "chunked_download_max_parallel", 8, minimum=1, ) self._download_max_chunks = self._cfg_int( config, "chunked_download_max_chunks", 256, minimum=1, ) self._warmup_before_listen = True self._warmup_timeout = 20.0 self._download_extensions, self._download_any_extension = ( self._normalize_download_extensions( config.get( "chunked_download_extensions", list(self._DOWNLOAD_DEFAULT_EXTS), ) ) ) # hosts override — DNS fake-map: domain/suffix → IP # Checked before any real DNS lookup; supports exact and suffix matching. self._hosts: dict[str, str] = config.get("hosts", {}) configured_direct_exclude = config.get("direct_google_exclude", []) self._direct_google_exclude = { h.lower().rstrip(".") for h in ( list(self._GOOGLE_DIRECT_EXACT_EXCLUDE) + list(configured_direct_exclude) ) } configured_direct_allow = config.get("direct_google_allow", []) self._direct_google_allow = { h.lower().rstrip(".") for h in ( list(self._GOOGLE_DIRECT_ALLOW_EXACT) + list(configured_direct_allow) ) } # ── Per-host policy ──────────────────────────────────────── # block_hosts — refuse traffic entirely (close or 403) # direct_hosts — route directly (no MITM, no relay) # bypass_hosts — legacy alias kept for backward compatibility # Both accept exact hostnames and leading-dot suffix patterns, # e.g. ".local" matches any *.local domain. self._block_hosts = load_host_rules(config.get("block_hosts", [])) # ── Adblock host lists ───────────────────────────────────── # adblock_lists: list of URLs to hosts-format blocklists. # Lists are loaded from disk cache at startup (fast), then # re-downloaded in background when the cache is stale. self._adblock_urls: list[str] = [ str(u).strip() for u in config.get("adblock_lists", []) if u ] if self._adblock_urls: try: from core.adblock import load_all _ab_domains = load_all(self._adblock_urls) self._adblock_hosts = load_host_rules(_ab_domains) log.info( "Adblock: %d domains active (%d lists)", len(_ab_domains), len(self._adblock_urls), ) except Exception as exc: log.warning("Adblock: failed to load lists at startup: %s", exc) self._adblock_hosts = (set(), ()) else: self._adblock_hosts = (set(), ()) direct_hosts = config.get("direct_hosts", []) bypass_hosts = config.get("bypass_hosts") if bypass_hosts is None: bypass_hosts = list(DEFAULT_BYPASS_HOSTS) self._bypass_hosts = load_host_rules( list(bypass_hosts) + list(direct_hosts) ) # Route YouTube through the relay when requested; the Google frontend # IP can enforce SafeSearch on the SNI-rewrite path. if config.get("youtube_via_relay", False): self._SNI_REWRITE_SUFFIXES = tuple( s for s in SNI_REWRITE_SUFFIXES if s not in self._YOUTUBE_SNI_SUFFIXES ) log.info("youtube_via_relay enabled — YouTube routed through relay") else: self._SNI_REWRITE_SUFFIXES = SNI_REWRITE_SUFFIXES # relay_url_patterns: list of URL path prefixes # (e.g. "youtube.com/youtubei/") that are forced through the Apps Script # relay even when youtube_via_relay is false. # The host is extracted and removed from SNI-rewrite so the proxy can # MITM-decrypt and inspect paths. Requests whose URL contains the full # pattern go to relay; all other paths on that host are forwarded # directly via SNI-rewrite HTTP (fast path). # When youtube_via_relay is true, RELAY_URL_PATTERNS is ignored entirely # so all of youtube.com goes through relay without path inspection. # Defaults to RELAY_URL_PATTERNS from constants.py; config key extends it. _youtube_via_relay: bool = config.get("youtube_via_relay", False) relay_patterns: list[str] = [ p.strip() for p in config.get("relay_url_patterns", []) if str(p).strip() ] if not _youtube_via_relay: relay_patterns = list(RELAY_URL_PATTERNS) + relay_patterns # Store the full patterns for per-request matching in _relay_smart. self._relay_url_patterns: tuple[str, ...] = tuple( re.sub(r'^https?://', '', p).lower() for p in relay_patterns ) if relay_patterns: forced: set[str] = set() for p in self._relay_url_patterns: host_part = p.split('/')[0].lstrip('.') if host_part: forced.add(host_part) # Remove matched suffixes from SNI-rewrite so they get MITM'd. self._SNI_REWRITE_SUFFIXES = tuple( s for s in self._SNI_REWRITE_SUFFIXES if not any( s == h or s.endswith('.' + h) or h.endswith('.' + s) for h in forced ) ) log.info( "relay_url_patterns: MITM forced on %s; relay only for: %s", ', '.join(sorted(forced)), ', '.join(self._relay_url_patterns), ) else: self._relay_url_patterns = () try: from .mitm import MITMCertManager, CA_CERT_FILE self.mitm = MITMCertManager() self._ca_cert_file = CA_CERT_FILE except ImportError: log.error("Apps Script relay requires the 'cryptography' package.") log.error("Run: pip install cryptography") raise SystemExit(1) # When LAN sharing is active, serve the CA cert over HTTP so other # devices on the network can download and install it easily. self._lan_sharing: bool = bool(config.get("lan_sharing", False)) # ── Host-policy helpers ─────────────────────────────────────── @staticmethod def _cfg_int(config: dict, key: str, default: int, *, minimum: int = 1) -> int: try: value = int(config.get(key, default)) except (TypeError, ValueError): value = default return max(minimum, value) @staticmethod def _cfg_float(config: dict, key: str, default: float, *, minimum: float = 0.1) -> float: try: value = float(config.get(key, default)) except (TypeError, ValueError): value = default return max(minimum, value) @classmethod def _normalize_download_extensions(cls, raw) -> tuple[tuple[str, ...], bool]: values = raw if isinstance(raw, (list, tuple)) else cls._DOWNLOAD_DEFAULT_EXTS normalized: list[str] = [] any_extension = False seen: set[str] = set() for item in values: ext = str(item).strip().lower() if not ext: continue if ext in {"*", ".*"}: any_extension = True continue if not ext.startswith("."): ext = "." + ext if ext not in seen: seen.add(ext) normalized.append(ext) if not normalized and not any_extension: normalized = list(cls._DOWNLOAD_DEFAULT_EXTS) return tuple(normalized), any_extension def _track_current_task(self) -> asyncio.Task | None: task = asyncio.current_task() if task is not None: self._client_tasks.add(task) return task def _untrack_task(self, task: asyncio.Task | None) -> None: if task is not None: self._client_tasks.discard(task) def _is_blocked(self, host: str) -> bool: return ( host_matches_rules(host, self._block_hosts) or host_matches_rules(host, self._adblock_hosts) ) async def _refresh_adblock_lists(self) -> None: """Background task: re-download stale adblock lists and hot-swap rules.""" if not self._adblock_urls: return try: from core.adblock import refresh_all def _update(domains: list[str]) -> None: self._adblock_hosts = load_host_rules(domains) log.info( "Adblock: rules updated — %d domains active", len(domains) ) await refresh_all(self._adblock_urls, callback=_update) except Exception as exc: log.warning("Adblock: background refresh failed: %s", exc) def _is_bypassed(self, host: str) -> bool: return host_matches_rules(host, self._bypass_hosts) def _cache_allowed(self, method: str, url: str, headers: dict | None, body: bytes) -> bool: if method.upper() != "GET" or body: return False for name in UNCACHEABLE_HEADER_NAMES: if header_value(headers, name): return False return self.fronter._is_static_asset_url(url) async def start(self): if self._warmup_before_listen: log.info( "Relay warmup in progress... waiting up to %.0fs before opening listeners", self._warmup_timeout, ) ready = await self.fronter.wait_until_warm(timeout=self._warmup_timeout) if ready: log.info("Relay warmup complete — enabling HTTP/SOCKS listeners") else: log.warning( "Relay warmup timed out after %.0fs — starting listeners anyway", self._warmup_timeout, ) http_srv = await asyncio.start_server(self._on_client, self.host, self.port) socks_srv = None if self.socks_enabled: try: socks_srv = await asyncio.start_server( self._on_socks_client, self.socks_host, self.socks_port ) except OSError as e: log.error("SOCKS5 listener failed on %s:%d: %s", self.socks_host, self.socks_port, e) self._servers = [s for s in (http_srv, socks_srv) if s] log.info( "HTTP proxy listening on %s:%d", self.host, self.port, ) if socks_srv: log.info( "SOCKS5 proxy listening on %s:%d", self.socks_host, self.socks_port, ) # Kick off adblock refresh in the background — won't block startup. if self._adblock_urls: asyncio.create_task(self._refresh_adblock_lists()) try: async with http_srv: if socks_srv: async with socks_srv: await asyncio.gather( http_srv.serve_forever(), socks_srv.serve_forever(), ) else: await http_srv.serve_forever() except asyncio.CancelledError: raise async def stop(self): """Shut down all listeners and release relay resources.""" for srv in self._servers: try: srv.close() except Exception: pass for srv in self._servers: try: await srv.wait_closed() except Exception: pass self._servers = [] current = asyncio.current_task() client_tasks = [task for task in self._client_tasks if task is not current] for task in client_tasks: task.cancel() if client_tasks: await asyncio.gather(*client_tasks, return_exceptions=True) self._client_tasks.clear() try: await self.fronter.close() except Exception as exc: log.debug("fronter.close: %s", exc) # ── client handler ──────────────────────────────────────────── async def _serve_ca_cert(self, writer: asyncio.StreamWriter) -> None: """Serve the MITM CA certificate so LAN devices can install it.""" import os as _os ca_path = getattr(self, "_ca_cert_file", None) if not ca_path or not _os.path.exists(ca_path): writer.write( b"HTTP/1.1 404 Not Found\r\n" b"Content-Length: 0\r\n" b"Connection: close\r\n\r\n" ) await writer.drain() return with open(ca_path, "rb") as f: cert_data = f.read() headers = ( b"HTTP/1.1 200 OK\r\n" b"Content-Type: application/x-x509-ca-cert\r\n" b"Content-Disposition: attachment; filename=\"ca.crt\"\r\n" + b"Content-Length: " + str(len(cert_data)).encode() + b"\r\n" + b"Connection: close\r\n\r\n" ) writer.write(headers + cert_data) await writer.drain() log.info("Served CA certificate to LAN device") async def _on_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): addr = writer.get_extra_info("peername") task = self._track_current_task() try: first_line = await asyncio.wait_for(reader.readline(), timeout=30) if not first_line: return # Read remaining headers header_block = first_line while True: line = await asyncio.wait_for(reader.readline(), timeout=10) header_block += line if len(header_block) > MAX_HEADER_BYTES: log.warning("Request header block exceeds cap — closing") return if line in (b"\r\n", b"\n", b""): break if has_unsupported_transfer_encoding(header_block): log.warning("Unsupported Transfer-Encoding on client request") writer.write( b"HTTP/1.1 501 Not Implemented\r\n" b"Connection: close\r\n" b"Content-Length: 0\r\n\r\n" ) await writer.drain() return request_line = first_line.decode(errors="replace").strip() parts = request_line.split(" ", 2) if len(parts) < 2: return method = parts[0].upper() path = parts[1] if len(parts) >= 2 else "/" if method == "GET" and path == "/ca.crt" and self._lan_sharing: await self._serve_ca_cert(writer) return if method == "CONNECT": await self._do_connect(parts[1], reader, writer) else: await self._do_http(header_block, reader, writer) except asyncio.CancelledError: pass except asyncio.TimeoutError: log.debug("Timeout: %s", addr) except Exception as e: log.error("Error (%s): %s", addr, e) finally: self._untrack_task(task) try: writer.close() await writer.wait_closed() except Exception: pass async def _on_socks_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): addr = writer.get_extra_info("peername") task = self._track_current_task() try: result = await negotiate_socks5(reader, writer) if result is None: return host, port = result log.info("SOCKS5 CONNECT → %s:%d", host, port) await self._handle_target_tunnel(host, port, reader, writer) except asyncio.IncompleteReadError: pass except asyncio.CancelledError: pass except asyncio.TimeoutError: log.debug("SOCKS5 timeout: %s", addr) except Exception as e: log.error("SOCKS5 error (%s): %s", addr, e) finally: self._untrack_task(task) try: writer.close() await writer.wait_closed() except Exception: pass # ── CONNECT (HTTPS tunnelling) ──────────────────────────────── async def _do_connect(self, target: str, reader, writer): host, _, port_str = target.rpartition(":") try: port = int(port_str) if port_str else 443 except ValueError: log.warning("CONNECT invalid target: %r", target) writer.write(b"HTTP/1.1 400 Bad Request\r\n\r\n") await writer.drain() return if not host: host, port = target, 443 log.info("CONNECT → %s:%d", host, port) writer.write(b"HTTP/1.1 200 Connection Established\r\n\r\n") await writer.drain() await self._handle_target_tunnel(host, port, reader, writer) async def _handle_target_tunnel(self, host: str, port: int, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): """Route a target connection through the Apps Script relay.""" # ── Block / bypass policy ───────────────────────────────── if self._is_blocked(host): log.warning("BLOCKED → %s:%d (matches block_hosts)", host, port) try: writer.write(b"HTTP/1.1 403 Forbidden\r\nContent-Length: 0\r\n\r\n") await writer.drain() except Exception: pass return if self._is_bypassed(host): log.info("Direct tunnel → %s:%d (matches direct_hosts/bypass_hosts)", host, port) await self._do_direct_tunnel(host, port, reader, writer) return # ── IP-literal destinations ─────────────────────────────── # Prefer a direct tunnel first (works for unblocked IPs and keeps # TLS end-to-end). If the network blocks the route (common for # Telegram data-centers behind DPI), fall back to: # • port 443 → MITM + relay through Apps Script # • port 80 → plain-HTTP relay through Apps Script # • other → give up (non-HTTP; can't be relayed) # We use a shorter connect timeout for IP literals (4 s) because # when the route is DPI-dropped, waiting longer doesn't help and # clients like Telegram speed up DC-rotation when we fail fast. # We remember per-IP failures for a short while so subsequent # connects skip the doomed direct attempt. if is_ip_literal(host): if not self._direct_temporarily_disabled(host): log.info("Direct tunnel → %s:%d (IP literal)", host, port) ok = await self._do_direct_tunnel( host, port, reader, writer, timeout=4.0, ) if ok: return self._remember_direct_failure(host, ttl=300) if port not in (80, 443): log.warning("Direct tunnel failed for %s:%d", host, port) return log.warning( "Direct tunnel fallback → %s:%d (switching to relay)", host, port, ) else: log.info( "Relay fallback → %s:%d (direct temporarily disabled)", host, port, ) if port == 443: await self._do_mitm_connect(host, port, reader, writer) elif port == 80: await self._do_plain_http_tunnel(host, port, reader, writer) return override_ip = self._sni_rewrite_ip(host) if override_ip: # SNI-blocked domain: MITM-decrypt from browser, then # re-connect to the override IP with SNI=front_domain so # the ISP never sees the blocked hostname in the TLS handshake. log.info("SNI-rewrite tunnel → %s via %s (SNI: %s)", host, override_ip, self.fronter.sni_host) await self._do_sni_rewrite_tunnel(host, port, reader, writer, connect_ip=override_ip) elif self._is_google_domain(host): if self._direct_temporarily_disabled(host): log.info("Relay fallback → %s (direct tunnel temporarily disabled)", host) if port == 443: await self._do_mitm_connect(host, port, reader, writer) else: await self._do_plain_http_tunnel(host, port, reader, writer) return log.info("Direct tunnel → %s (Google domain, skipping relay)", host) ok = await self._do_direct_tunnel(host, port, reader, writer) if ok: return self._remember_direct_failure(host) log.warning("Direct tunnel fallback → %s (switching to relay)", host) if port == 443: await self._do_mitm_connect(host, port, reader, writer) else: await self._do_plain_http_tunnel(host, port, reader, writer) elif port == 443: await self._do_mitm_connect(host, port, reader, writer) elif port == 80: await self._do_plain_http_tunnel(host, port, reader, writer) else: # Non-HTTP port (e.g. mtalk:5228 XMPP, IMAP, SMTP, SSH) — # payload isn't HTTP, so we can't relay or MITM. Tunnel bytes. log.info("Direct tunnel → %s:%d (non-HTTP port)", host, port) ok = await self._do_direct_tunnel(host, port, reader, writer) if not ok: log.warning("Direct tunnel failed for %s:%d", host, port) # ── Hosts override (fake DNS) ───────────────────────────────── # Built-in list of domains that must be reached via Google's frontend IP # with SNI rewritten to `front_domain` (default: www.google.com). # Source: constants.SNI_REWRITE_SUFFIXES. # When youtube_via_relay is enabled the YouTube suffixes are removed so # YouTube goes through the Apps Script relay instead. _YOUTUBE_SNI_SUFFIXES = frozenset({ "youtube.com", "youtu.be", "youtube-nocookie.com", }) _SNI_REWRITE_SUFFIXES = SNI_REWRITE_SUFFIXES def _sni_rewrite_ip(self, host: str) -> str | None: """Return the IP to SNI-rewrite `host` through, or None. Order of precedence: 1. Explicit entry in config `hosts` map (exact or suffix match). 2. Built-in `_SNI_REWRITE_SUFFIXES` → mapped to config `google_ip`. """ ip = self._hosts_ip(host) if ip: return ip h = host.lower().rstrip(".") for suffix in self._SNI_REWRITE_SUFFIXES: if h == suffix or h.endswith("." + suffix): return self.fronter.connect_host # configured google_ip return None def _hosts_ip(self, host: str) -> str | None: """Return override IP for host if defined in config 'hosts', else None. Supports exact match and suffix match (e.g. 'youtube.com' matches 'www.youtube.com', 'm.youtube.com', etc.). """ h = host.lower().rstrip(".") if h in self._hosts: return self._hosts[h] # suffix match: check every parent label parts = h.split(".") for i in range(1, len(parts)): parent = ".".join(parts[i:]) if parent in self._hosts: return self._hosts[parent] return None # ── Google domain detection ─────────────────────────────────── # Google-owned domains that may use the raw direct-tunnel shortcut. # YouTube/googlevideo SNIs are blocked; they go through # _do_sni_rewrite_tunnel via the hosts map instead. # Source: constants.GOOGLE_OWNED_SUFFIXES / GOOGLE_OWNED_EXACT. _GOOGLE_OWNED_SUFFIXES = GOOGLE_OWNED_SUFFIXES _GOOGLE_OWNED_EXACT = GOOGLE_OWNED_EXACT def _is_google_domain(self, host: str) -> bool: """Return True if host should use the raw direct Google shortcut.""" h = host.lower().rstrip(".") if self._is_direct_google_excluded(h): return False if not self._is_google_owned_domain(h): return False return self._is_direct_google_allowed(h) def _is_google_owned_domain(self, host: str) -> bool: if host in self._GOOGLE_OWNED_EXACT: return True for suffix in self._GOOGLE_OWNED_SUFFIXES: if host.endswith(suffix): return True return False def _is_direct_google_excluded(self, host: str) -> bool: if host in self._direct_google_exclude: return True for suffix in self._GOOGLE_DIRECT_SUFFIX_EXCLUDE: if host.endswith(suffix): return True for token in self._direct_google_exclude: if token.startswith(".") and host.endswith(token): return True return False def _is_direct_google_allowed(self, host: str) -> bool: if host in self._direct_google_allow: return True for suffix in self._GOOGLE_DIRECT_ALLOW_SUFFIXES: if host.endswith(suffix): return True for token in self._direct_google_allow: if token.startswith(".") and host.endswith(token): return True return False def _direct_temporarily_disabled(self, host: str) -> bool: h = host.lower().rstrip(".") now = time.time() disabled = False for key in self._direct_failure_keys(h): until = self._direct_fail_until.get(key, 0) if until > now: disabled = True else: self._direct_fail_until.pop(key, None) return disabled def _remember_direct_failure(self, host: str, ttl: int = 600): until = time.time() + ttl for key in self._direct_failure_keys(host.lower().rstrip(".")): self._direct_fail_until[key] = until def _direct_failure_keys(self, host: str) -> tuple[str, ...]: keys = [host] if host.endswith(".google.com") or host == "google.com": keys.append("*.google.com") if host.endswith(".googleapis.com") or host == "googleapis.com": keys.append("*.googleapis.com") if host.endswith(".gstatic.com") or host == "gstatic.com": keys.append("*.gstatic.com") if host.endswith(".googleusercontent.com") or host == "googleusercontent.com": keys.append("*.googleusercontent.com") return tuple(dict.fromkeys(keys)) async def _open_tcp_connection(self, target: str, port: int, timeout: float = 10.0): """Connect with IPv4-first resolution and clearer failure reporting.""" errors: list[str] = [] loop = asyncio.get_running_loop() # Strip IPv6 brackets (CONNECT may deliver "[::1]" as the hostname). # ipaddress.ip_address() rejects the bracketed form, which would # otherwise force a DNS lookup for an IP literal and fail. lookup_target = target.strip() if lookup_target.startswith("[") and lookup_target.endswith("]"): lookup_target = lookup_target[1:-1] try: ipaddress.ip_address(lookup_target) candidates = [(0, lookup_target)] except ValueError: try: infos = await asyncio.wait_for( loop.getaddrinfo( lookup_target, port, family=socket.AF_UNSPEC, type=socket.SOCK_STREAM, ), timeout=timeout, ) except Exception as exc: raise OSError(f"dns lookup failed for {lookup_target}: {exc!r}") from exc candidates = [] seen = set() for family, _type, _proto, _canon, sockaddr in infos: ip = sockaddr[0] key = (family, ip) if key in seen: continue seen.add(key) candidates.append((family, ip)) candidates.sort(key=lambda item: 0 if item[0] == socket.AF_INET else 1) for family, ip in candidates: try: return await asyncio.wait_for( asyncio.open_connection(ip, port, family=family or 0), timeout=timeout, ) except Exception as exc: fam = "ipv4" if family == socket.AF_INET else ( "ipv6" if family == socket.AF_INET6 else "auto" ) errors.append(f"{ip} ({fam}): {exc!r}") raise OSError("; ".join(errors) or f"connect failed for {target}:{port}") # ── Direct tunnel (no MITM) ─────────────────────────────────── async def _do_direct_tunnel(self, host: str, port: int, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, connect_ip: str | None = None, timeout: float | None = None): """Pipe raw TLS bytes directly to the target server. connect_ip overrides DNS: the TCP connection goes to that IP while the browser's TLS (SNI=host) is piped through unchanged. Without an override we connect to the real hostname so browser-safe Google properties (Gemini assets, Play, Accounts, etc.) use their normal edge instead of being forced onto the fronting IP. """ target_ip = connect_ip or host effective_timeout = ( self._tcp_connect_timeout if timeout is None else float(timeout) ) try: r_remote, w_remote = await self._open_tcp_connection( target_ip, port, timeout=effective_timeout, ) except Exception as e: log.error("Direct tunnel connect failed (%s via %s): %s", host, target_ip, e) return False async def pipe(src, dst, label): try: while True: data = await src.read(65536) if not data: break dst.write(data) await dst.drain() except (ConnectionError, asyncio.CancelledError): pass except Exception as e: log.debug("Pipe %s ended: %s", label, e) finally: # Half-close rather than hard-close so the other direction # can still flush final bytes (important for TLS close_notify). try: if not dst.is_closing() and dst.can_write_eof(): dst.write_eof() except Exception: try: dst.close() except Exception: pass await asyncio.gather( pipe(reader, w_remote, f"client→{host}"), pipe(r_remote, writer, f"{host}→client"), ) return True # ── SNI-rewrite tunnel ──────────────────────────────────────── async def _do_sni_rewrite_tunnel(self, host: str, port: int, reader, writer, connect_ip: str | None = None): """MITM-decrypt TLS from browser, then re-encrypt toward connect_ip using SNI=front_domain (e.g. www.google.com). The ISP only ever sees SNI=www.google.com in the outgoing handshake, hiding the blocked hostname (e.g. www.youtube.com). """ target_ip = connect_ip or self.fronter.connect_host sni_out = self.fronter.sni_host # e.g. "www.google.com" # Step 1: MITM — accept TLS from the browser ssl_ctx_server = self.mitm.get_server_context(host) loop = asyncio.get_running_loop() transport = writer.transport protocol = transport.get_protocol() try: new_transport = await loop.start_tls( transport, protocol, ssl_ctx_server, server_side=True, ) except Exception as e: log.debug("SNI-rewrite TLS accept failed (%s): %s", host, e) return writer._transport = new_transport # Step 2: open outgoing TLS to target IP with the safe SNI. # Reuse the SSLContext already built by DomainFronter (certifi bundle, # verify_ssl flag) — no need to rebuild it on every CONNECT. try: r_out, w_out = await asyncio.wait_for( asyncio.open_connection( target_ip, port, ssl=self.fronter._ssl_ctx(), server_hostname=sni_out, ), timeout=self._tcp_connect_timeout, ) except Exception as e: log.error("SNI-rewrite outbound connect failed (%s via %s): %s", host, target_ip, e) return # Step 3: pipe application-layer bytes between the two TLS sessions async def pipe(src, dst, label): try: while True: data = await src.read(65536) if not data: break dst.write(data) await dst.drain() except (ConnectionError, asyncio.CancelledError): pass except Exception as exc: log.debug("Pipe %s ended: %s", label, exc) finally: try: dst.close() except Exception: pass await asyncio.gather( pipe(reader, w_out, f"client→{host}"), pipe(r_out, writer, f"{host}→client"), ) # ── MITM CONNECT (apps_script mode) ─────────────────────────── async def _do_plain_http_tunnel(self, host: str, port: int, reader, writer): """Handle plain HTTP over SOCKS5 in apps_script mode.""" log.info("Plain HTTP relay → %s:%d", host, port) await self._relay_http_stream(host, port, reader, writer) async def _do_mitm_connect(self, host: str, port: int, reader, writer): """Intercept TLS, decrypt HTTP, and relay through Apps Script.""" ssl_ctx = self.mitm.get_server_context(host) # Upgrade the existing connection to TLS (we are the server) loop = asyncio.get_running_loop() transport = writer.transport protocol = transport.get_protocol() try: new_transport = await loop.start_tls( transport, protocol, ssl_ctx, server_side=True, ) except Exception as e: # TLS handshake failed. Common causes: # • Telegram Desktop / MTProto over port 443 sends obfuscated # non-TLS bytes — we literally cannot decrypt these, and # since the target IP is blocked we can't direct-tunnel # either. Telegram will rotate to another DC on its own; # failing fast here lets that happen sooner. # • Client CONNECTs but never speaks TLS (some probes). if is_ip_literal(host) and port == 443: log.info( "Non-TLS traffic on %s:%d (likely Telegram MTProto / " "obfuscated protocol). This DC appears blocked; the " "client should rotate to another endpoint shortly.", host, port, ) elif port != 443: log.debug( "TLS handshake skipped for %s:%d (non-HTTPS): %s", host, port, e, ) else: log.debug("TLS handshake failed for %s: %s", host, e) # Close the client side so it fails fast and can retry, rather # than hanging on a half-open connection. try: if not writer.is_closing(): writer.close() except Exception: pass return # Update writer to use the new TLS transport writer._transport = new_transport await self._relay_http_stream(host, port, reader, writer) async def _relay_http_stream(self, host: str, port: int, reader, writer): """Read decrypted/origin-form HTTP requests and relay them.""" # Read and relay HTTP requests from the browser (now decrypted) while True: try: first_line = await asyncio.wait_for( reader.readline(), timeout=CLIENT_IDLE_TIMEOUT ) if not first_line: break header_block = first_line oversized_headers = False while True: line = await asyncio.wait_for(reader.readline(), timeout=10) header_block += line if len(header_block) > MAX_HEADER_BYTES: oversized_headers = True break if line in (b"\r\n", b"\n", b""): break # Reject truncated / oversized header blocks cleanly rather # than forwarding a half-parsed request to the relay — doing # so would send malformed JSON payloads to Apps Script and # leave the client hanging until its own timeout fires. if oversized_headers: log.warning( "MITM header block exceeds %d bytes — closing (%s)", MAX_HEADER_BYTES, host, ) try: writer.write( b"HTTP/1.1 431 Request Header Fields Too Large\r\n" b"Connection: close\r\n" b"Content-Length: 0\r\n\r\n" ) await writer.drain() except Exception: pass break # Read body body = b"" if has_unsupported_transfer_encoding(header_block): log.warning("Unsupported Transfer-Encoding → %s:%d", host, port) writer.write( b"HTTP/1.1 501 Not Implemented\r\n" b"Connection: close\r\n" b"Content-Length: 0\r\n\r\n" ) await writer.drain() break length = parse_content_length(header_block) if length > MAX_REQUEST_BODY_BYTES: raise ValueError(f"Request body too large: {length} bytes") if length > 0: body = await reader.readexactly(length) # Parse the request request_line = first_line.decode(errors="replace").strip() parts = request_line.split(" ", 2) if len(parts) < 2: break method = parts[0] path = parts[1] # Parse headers headers = {} for raw_line in header_block.split(b"\r\n")[1:]: if b":" in raw_line: k, v = raw_line.decode(errors="replace").split(":", 1) headers[k.strip()] = v.strip() # Shortening the length of X API URLs to prevent relay errors. if (host == "x.com" or host == "twitter.com") and re.match(r"/i/api/graphql/[^/]+/[^?]+\?variables=", path): path = path.split("&")[0] # MITM traffic arrives as origin-form paths; SOCKS/plain HTTP can # also send absolute-form requests. Normalize both to full URLs. if path.startswith("http://") or path.startswith("https://"): url = path elif port == 443: url = f"https://{host}{path}" elif port == 80: url = f"http://{host}{path}" else: url = f"http://{host}:{port}{path}" log.info("MITM → %s %s", method, url) # ── CORS: extract relevant request headers ───────────── origin = header_value(headers, "origin") acr_method = header_value( headers, "access-control-request-method", ) acr_headers = header_value( headers, "access-control-request-headers", ) # CORS preflight — respond directly. Apps Script's # UrlFetchApp does not support the OPTIONS method, so # forwarding preflights would always fail and break every # cross-origin fetch/XHR the browser runs through us. if method.upper() == "OPTIONS" and acr_method: log.debug( "CORS preflight → %s (responding locally)", url[:60], ) writer.write(cors_preflight_response( origin, acr_method, acr_headers, )) await writer.drain() continue if await self._maybe_stream_download(method, url, headers, body, writer): continue # Check local cache first (GET only) response = None if self._cache_allowed(method, url, headers, body): response = self._cache.get(url) if response: log.debug("Cache HIT: %s", url[:60]) if response is None: # Relay through Apps Script try: response = await self._relay_smart(method, url, headers, body) except Exception as e: log.error("Relay error (%s): %s", url[:60], e) err_body = f"Relay error: {e}".encode() response = ( b"HTTP/1.1 502 Bad Gateway\r\n" b"Content-Type: text/plain\r\n" b"Content-Length: " + str(len(err_body)).encode() + b"\r\n" b"\r\n" + err_body ) # Cache successful GET responses if self._cache_allowed(method, url, headers, body) and response: ttl = ResponseCache.parse_ttl(response, url) if ttl > 0: self._cache.put(url, response, ttl) log.debug("Cached (%ds): %s", ttl, url[:60]) # Inject permissive CORS headers whenever the browser sent # an Origin (cross-origin XHR / fetch). Without this, the # browser blocks the response even though the relay fetched # it successfully. if origin and response: response = inject_cors_headers(response, origin) log_response_summary( logger=log, split_raw_response=split_raw_response, trace_suffixes=self._TRACE_HOST_SUFFIXES, url=url, response=response, ) writer.write(response) await writer.drain() except asyncio.TimeoutError: break except asyncio.IncompleteReadError: break except ConnectionError: break except Exception as e: log.error("MITM handler error (%s): %s", host, e) break # ── CORS helpers ────────────────────────────────────────────── # cors_preflight_response() and inject_cors_headers() live in proxy_support. def _url_matches_relay_pattern(self, url: str) -> bool: """Return True if url matches any entry in _relay_url_patterns. Pattern format: "host/path" (no scheme). The url host may have extra subdomains (e.g. www.youtube.com matches youtube.com). """ normalized = re.sub(r'^https?://', '', url).lower() slash = normalized.find('/') url_host = normalized[:slash] if slash != -1 else normalized url_path = normalized[slash:] if slash != -1 else '/' for p in self._relay_url_patterns: slash_p = p.find('/') pat_host = p[:slash_p] if slash_p != -1 else p pat_path = p[slash_p:] if slash_p != -1 else '/' host_match = (url_host == pat_host or url_host.endswith('.' + pat_host)) if host_match and url_path.startswith(pat_path): return True return False async def _forward_via_sni_rewrite(self, method: str, url: str, headers: dict, body: bytes) -> bytes: """Forward an HTTP request to its real origin via the SNI-rewrite path. Connects to google_ip:443 with SNI=front_domain (DPI only sees a safe Google SNI), then sends the actual HTTP/1.1 request with the real Host header so YouTube's edge serves the correct response. """ # Parse host and path from URL. stripped = re.sub(r'^https?://', '', url) slash = stripped.find('/') if slash == -1: host = stripped path = '/' else: host = stripped[:slash] path = stripped[slash:] # Build HTTP/1.1 request bytes. req_headers = dict(headers) req_headers['Host'] = host # Use Connection: close so we don't need to manage keep-alive. req_headers['Connection'] = 'close' req_lines = [f"{method} {path} HTTP/1.1\r\n"] for k, v in req_headers.items(): req_lines.append(f"{k}: {v}\r\n") req_lines.append("\r\n") request_bytes = "".join(req_lines).encode() + (body or b"") r, w = await asyncio.wait_for( asyncio.open_connection( self.fronter.connect_host, 443, ssl=self.fronter._ssl_ctx(), server_hostname=self.fronter.sni_host, ), timeout=self._tcp_connect_timeout, ) try: w.write(request_bytes) await w.drain() chunks = [] while True: chunk = await asyncio.wait_for(r.read(65536), timeout=30) if not chunk: break chunks.append(chunk) finally: try: w.close() except Exception: pass return b"".join(chunks) async def _relay_smart(self, method, url, headers, body): """Choose optimal relay strategy based on request type. - If relay_url_patterns are configured and the URL does NOT match, forward via SNI-rewrite HTTP (fast direct path). - GET requests for likely-large downloads use parallel-range relay. - All other requests go through the single-request relay. """ # Path-level relay routing: only matching URL prefixes go through relay; # everything else on the same host is forwarded via SNI-rewrite. if self._relay_url_patterns and not self._url_matches_relay_pattern(url): # Check if this host is one we pulled out of SNI-rewrite. stripped = re.sub(r'^https?://', '', url).lower() slash = stripped.find('/') req_host = stripped[:slash] if slash != -1 else stripped pattern_hosts = {p.split('/')[0] for p in self._relay_url_patterns} host_covered = any( req_host == h or req_host.endswith('.' + h) for h in pattern_hosts ) if host_covered: return await self._forward_via_sni_rewrite(method, url, headers, body) if method == "GET" and not body: # Respect client's own Range header verbatim. if header_value(headers, "range"): return await self.fronter.relay(method, url, headers, body) # Only probe with Range when the URL looks like a big file. if self._is_likely_download(url, headers): return await self.fronter.relay_parallel( method, url, headers, body, chunk_size=self._download_chunk_size, max_parallel=self._download_max_parallel, max_chunks=self._download_max_chunks, min_size=self._download_min_size, ) return await self.fronter.relay(method, url, headers, body) def _is_likely_download(self, url: str, headers: dict) -> bool: """Heuristic: is this URL likely a large file download?""" path = url.split("?")[0].lower() if self._download_any_extension: return True for ext in self._download_extensions: if path.endswith(ext): return True accept = header_value(headers, "accept").lower() if any(marker in accept for marker in self._DOWNLOAD_ACCEPT_MARKERS): return True return False async def _maybe_stream_download(self, method: str, url: str, headers: dict | None, body: bytes, writer) -> bool: if method.upper() != "GET" or body: return False if header_value(headers, "range"): return False effective_headers = headers or {} if not self._is_likely_download(url, effective_headers): return False if not self.fronter.stream_download_allowed(url): return False return await self.fronter.stream_parallel_download( url, effective_headers, writer, chunk_size=self._download_chunk_size, max_parallel=self._download_max_parallel, max_chunks=self._download_max_chunks, min_size=self._download_min_size, ) # ── Plain HTTP forwarding ───────────────────────────────────── async def _do_http(self, header_block: bytes, reader, writer): body = b"" if has_unsupported_transfer_encoding(header_block): log.warning("Unsupported Transfer-Encoding on plain HTTP request") writer.write( b"HTTP/1.1 501 Not Implemented\r\n" b"Connection: close\r\n" b"Content-Length: 0\r\n\r\n" ) await writer.drain() return length = parse_content_length(header_block) if length > MAX_REQUEST_BODY_BYTES: writer.write(b"HTTP/1.1 413 Content Too Large\r\n\r\n") await writer.drain() return if length > 0: body = await reader.readexactly(length) first_line = header_block.split(b"\r\n")[0].decode(errors="replace") log.info("HTTP → %s", first_line) # Parse request and relay through Apps Script parts = first_line.strip().split(" ", 2) method = parts[0] if parts else "GET" url = parts[1] if len(parts) > 1 else "/" headers = {} for raw_line in header_block.split(b"\r\n")[1:]: if b":" in raw_line: k, v = raw_line.decode(errors="replace").split(":", 1) headers[k.strip()] = v.strip() # ── CORS preflight over plain HTTP ───────────────────────────── origin = header_value(headers, "origin") acr_method = header_value(headers, "access-control-request-method") acr_headers = header_value(headers, "access-control-request-headers") if method.upper() == "OPTIONS" and acr_method: log.debug("CORS preflight (HTTP) → %s (responding locally)", url[:60]) writer.write(cors_preflight_response( origin, acr_method, acr_headers, )) await writer.drain() return if await self._maybe_stream_download(method, url, headers, body, writer): return # Cache check for GET response = None if self._cache_allowed(method, url, headers, body): response = self._cache.get(url) if response: log.debug("Cache HIT (HTTP): %s", url[:60]) if response is None: response = await self._relay_smart(method, url, headers, body) # Cache successful GET if self._cache_allowed(method, url, headers, body) and response: ttl = ResponseCache.parse_ttl(response, url) if ttl > 0: self._cache.put(url, response, ttl) if origin and response: response = inject_cors_headers(response, origin) log_response_summary( logger=log, split_raw_response=split_raw_response, trace_suffixes=self._TRACE_HOST_SUFFIXES, url=url, response=response, ) writer.write(response) await writer.drain()