diff --git a/config.example.json b/config.example.json index 387a0ed..9f0d46d 100644 --- a/config.example.json +++ b/config.example.json @@ -7,8 +7,15 @@ "auth_key": "CHANGE_ME_TO_A_STRONG_SECRET", "listen_host": "127.0.0.1", "listen_port": 8085, + "socks5_enabled": true, + "socks5_host": "127.0.0.1", + "socks5_port": 1080, "log_level": "INFO", "verify_ssl": true, + "_direct_google_exclude_comment": "Google web apps that should NEVER use the raw direct-tunnel shortcut. Supports exact hosts and optional suffix patterns like \".googleapis.com\". They will go through the MITM relay path instead for better compatibility.", + "direct_google_exclude": ["gemini.google.com", "aistudio.google.com", "notebooklm.google.com", "labs.google.com", "meet.google.com", "accounts.google.com", "ogs.google.com", "mail.google.com", "calendar.google.com", "drive.google.com", "docs.google.com", "chat.google.com"], + "_direct_google_allow_comment": "Conservative allowlist for raw direct Google tunneling. Leave empty unless you have confirmed a host works better direct than via relay.", + "direct_google_allow": ["www.google.com", "safebrowsing.google.com"], "_hosts_comment": "Optional SNI-rewrite overrides. YouTube, googlevideo, gstatic, fonts.googleapis.com, ytimg, ggpht, doubleclick, etc. are ALREADY handled automatically (routed via google_ip with SNI=front_domain, same trick as the Xray MITM-DomainFronting config). Add entries here only for custom domains, e.g. \"example.com\": \"216.239.38.120\".", "hosts": {} } diff --git a/domain_fronter.py b/domain_fronter.py index a623cab..862c1ac 100644 --- a/domain_fronter.py +++ b/domain_fronter.py @@ -19,6 +19,7 @@ Mode 4 (apps_script): import asyncio import base64 +import hashlib import gzip import json import logging @@ -34,6 +35,12 @@ log = logging.getLogger("Fronter") class DomainFronter: + _STATIC_EXTS = ( + ".css", ".js", ".mjs", ".woff", ".woff2", ".ttf", ".eot", + ".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".ico", + ".mp3", ".mp4", ".webm", ".wasm", ".avif", + ) + def __init__(self, config: dict): mode = config.get("mode", "domain_fronting") @@ -170,9 +177,34 @@ class DomainFronter: self._script_idx += 1 return sid - def _exec_path(self) -> str: - """Get the next Apps Script endpoint path (/dev or /exec).""" - sid = self._next_script_id() + @staticmethod + def _host_key(url_or_host: str | None) -> str: + """Return a stable routing key for a URL or host string.""" + if not url_or_host: + return "" + parsed = urlparse(url_or_host if "://" in url_or_host else f"https://{url_or_host}") + host = parsed.hostname or url_or_host + return host.lower().rstrip(".") + + def _script_id_for_key(self, key: str | None = None) -> str: + """Pick a stable Apps Script ID for a host or fallback to round-robin. + + When multiple deployments are configured, using a stable mapping per + host reduces IP/session churn for sites that are sensitive to endpoint + changes. If no key is available, we keep the older round-robin fallback + so warmup/keepalive traffic still distributes normally. + """ + if len(self._script_ids) == 1: + return self._script_ids[0] + if not key: + return self._next_script_id() + digest = hashlib.sha1(key.encode("utf-8")).digest() + idx = int.from_bytes(digest[:4], "big") % len(self._script_ids) + return self._script_ids[idx] + + def _exec_path(self, url_or_host: str | None = None) -> str: + """Get the Apps Script endpoint path (/dev or /exec).""" + sid = self._script_id_for_key(self._host_key(url_or_host)) return f"/macros/s/{sid}/{'dev' if self._dev_available else 'exec'}" async def _flush_pool(self): @@ -332,7 +364,7 @@ class DomainFronter: # Apps Script keepalive — warm the container payload = {"m": "HEAD", "u": "http://example.com/", "k": self.auth_key} - path = self._exec_path() + path = self._exec_path("example.com") t0 = time.perf_counter() await asyncio.wait_for( self._h2.request( @@ -514,6 +546,11 @@ class DomainFronter: payload = self._build_payload(method, url, headers, body) + # Stateful/browser-navigation requests should preserve exact ordering + # and header context; batching/coalescing is reserved for static fetches. + if self._is_stateful_request(method, url, headers, body): + return await self._relay_with_retry(payload) + # Coalesce concurrent GETs for the same URL. # CRITICAL: do NOT coalesce when a Range header is present — # parallel range downloads MUST each hit the server independently. @@ -711,7 +748,8 @@ class DomainFronter: payload = { "m": method, "u": url, - "r": True, + # Let the browser/app see origin redirects and cookies directly. + "r": False, } if headers: # Strip Accept-Encoding: Apps Script auto-decompresses gzip @@ -726,6 +764,46 @@ class DomainFronter: payload["ct"] = ct return payload + @classmethod + def _is_static_asset_url(cls, url: str) -> bool: + path = urlparse(url).path.lower() + return any(path.endswith(ext) for ext in cls._STATIC_EXTS) + + @staticmethod + def _header_value(headers: dict | None, name: str) -> str: + if not headers: + return "" + for key, value in headers.items(): + if key.lower() == name: + return str(value) + return "" + + @classmethod + def _is_stateful_request(cls, method: str, url: str, + headers: dict | None, body: bytes) -> bool: + method = method.upper() + if method not in {"GET", "HEAD"} or body: + return True + + if headers: + for name in ( + "cookie", "authorization", "proxy-authorization", + "origin", "referer", "if-none-match", "if-modified-since", + "cache-control", "pragma", + ): + if cls._header_value(headers, name): + return True + + accept = cls._header_value(headers, "accept").lower() + if "text/html" in accept or "application/json" in accept: + return True + + fetch_mode = cls._header_value(headers, "sec-fetch-mode").lower() + if fetch_mode in {"navigate", "cors"}: + return True + + return not cls._is_static_asset_url(url) + # ── Batch collector ─────────────────────────────────────────── async def _batch_submit(self, payload: dict) -> bytes: @@ -866,7 +944,7 @@ class DomainFronter: full_payload["k"] = self.auth_key json_body = json.dumps(full_payload).encode() - path = self._exec_path() + path = self._exec_path(payload.get("u")) status, headers, body = await self._h2.request( method="POST", path=path, host=self.http_host, @@ -883,7 +961,7 @@ class DomainFronter: full_payload["k"] = self.auth_key json_body = json.dumps(full_payload).encode() - path = self._exec_path() + path = self._exec_path(payload.get("u")) reader, writer, created = await self._acquire() try: @@ -911,14 +989,22 @@ class DomainFronter: parsed = urlparse(location) rpath = parsed.path + ("?" + parsed.query if parsed.query else "") - request = ( - f"GET {rpath} HTTP/1.1\r\n" - f"Host: {parsed.netloc}\r\n" - f"Accept-Encoding: gzip\r\n" - f"Connection: keep-alive\r\n" - f"\r\n" - ) - writer.write(request.encode()) + if status in (307, 308): + redirect_method = "POST" + redirect_body = json_body + else: + redirect_method = "GET" + redirect_body = b"" + request_lines = [ + f"{redirect_method} {rpath} HTTP/1.1", + f"Host: {parsed.netloc}", + "Accept-Encoding: gzip", + "Connection: keep-alive", + ] + if redirect_body: + request_lines.append(f"Content-Length: {len(redirect_body)}") + request = "\r\n".join(request_lines) + "\r\n\r\n" + writer.write(request.encode() + redirect_body) await writer.drain() status, resp_headers, resp_body = await self._read_http_response(reader) @@ -939,7 +1025,7 @@ class DomainFronter: "q": payloads, } json_body = json.dumps(batch_payload).encode() - path = self._exec_path() + path = self._exec_path(payloads[0].get("u") if payloads else None) # Try HTTP/2 first if self._h2 and self._h2.is_connected: @@ -983,14 +1069,22 @@ class DomainFronter: break parsed = urlparse(location) rpath = parsed.path + ("?" + parsed.query if parsed.query else "") - request = ( - f"GET {rpath} HTTP/1.1\r\n" - f"Host: {parsed.netloc}\r\n" - f"Accept-Encoding: gzip\r\n" - f"Connection: keep-alive\r\n" - f"\r\n" - ) - writer.write(request.encode()) + if status in (307, 308): + redirect_method = "POST" + redirect_body = json_body + else: + redirect_method = "GET" + redirect_body = b"" + request_lines = [ + f"{redirect_method} {rpath} HTTP/1.1", + f"Host: {parsed.netloc}", + "Accept-Encoding: gzip", + "Connection: keep-alive", + ] + if redirect_body: + request_lines.append(f"Content-Length: {len(redirect_body)}") + request = "\r\n".join(request_lines) + "\r\n\r\n" + writer.write(request.encode() + redirect_body) await writer.drain() status, resp_headers, resp_body = await self._read_http_response(reader) diff --git a/main.py b/main.py index d80add6..a191b58 100644 --- a/main.py +++ b/main.py @@ -51,6 +51,17 @@ def parse_args(): default=None, help="Override listen host (env: DFT_HOST)", ) + parser.add_argument( + "--socks5-port", + type=int, + default=None, + help="Override SOCKS5 listen port (env: DFT_SOCKS5_PORT)", + ) + parser.add_argument( + "--disable-socks5", + action="store_true", + help="Disable the built-in SOCKS5 listener.", + ) parser.add_argument( "--log-level", choices=["DEBUG", "INFO", "WARNING", "ERROR"], @@ -107,6 +118,14 @@ def main(): elif os.environ.get("DFT_HOST"): config["listen_host"] = os.environ["DFT_HOST"] + if args.socks5_port is not None: + config["socks5_port"] = args.socks5_port + elif os.environ.get("DFT_SOCKS5_PORT"): + config["socks5_port"] = int(os.environ["DFT_SOCKS5_PORT"]) + + if args.disable_socks5: + config["socks5_enabled"] = False + if args.log_level is not None: config["log_level"] = args.log_level elif os.environ.get("DFT_LOG_LEVEL"): @@ -162,7 +181,7 @@ def main(): config.get("front_domain", "www.google.com")) script_ids = config.get("script_ids") or config.get("script_id") if isinstance(script_ids, list): - log.info("Script IDs : %d scripts (round-robin)", len(script_ids)) + log.info("Script IDs : %d scripts (sticky per-host)", len(script_ids)) for i, sid in enumerate(script_ids): log.info(" [%d] %s", i + 1, sid) else: @@ -192,7 +211,13 @@ def main(): log.info("Front domain (SNI) : %s", config.get("front_domain", "?")) log.info("Worker host (Host) : %s", config.get("worker_host", "?")) - log.info("Proxy address : %s:%d", config.get("listen_host", "127.0.0.1"), config.get("listen_port", 8080)) + log.info("HTTP proxy : %s:%d", + config.get("listen_host", "127.0.0.1"), + config.get("listen_port", 8080)) + if config.get("socks5_enabled", True): + log.info("SOCKS5 proxy : %s:%d", + config.get("socks5_host", config.get("listen_host", "127.0.0.1")), + config.get("socks5_port", 1080)) try: asyncio.run(ProxyServer(config).start()) diff --git a/proxy_server.py b/proxy_server.py index fb9b1d8..ea9117c 100644 --- a/proxy_server.py +++ b/proxy_server.py @@ -12,8 +12,11 @@ Supports: import asyncio import logging import re +import socket import ssl import time +import ipaddress +from urllib.parse import urlparse from domain_fronter import DomainFronter @@ -69,7 +72,7 @@ class ResponseCache: # Don't cache errors or non-200 if b"HTTP/1.1 200" not in raw_response[:20]: return 0 - if "no-store" in hdr: + if "no-store" in hdr or "private" in hdr or "set-cookie:" in hdr: return 0 # Explicit max-age @@ -101,13 +104,57 @@ class ResponseCache: class ProxyServer: + _GOOGLE_DIRECT_EXACT_EXCLUDE = { + "gemini.google.com", + "aistudio.google.com", + "notebooklm.google.com", + "labs.google.com", + "meet.google.com", + "accounts.google.com", + "ogs.google.com", + "mail.google.com", + "calendar.google.com", + "drive.google.com", + "docs.google.com", + "chat.google.com", + "photos.google.com", + "maps.google.com", + "myaccount.google.com", + "contacts.google.com", + "classroom.google.com", + "keep.google.com", + "play.google.com", + } + _GOOGLE_DIRECT_SUFFIX_EXCLUDE = ( + ".meet.google.com", + ) + _GOOGLE_DIRECT_ALLOW_EXACT = { + "www.google.com", + "google.com", + "safebrowsing.google.com", + } + _GOOGLE_DIRECT_ALLOW_SUFFIXES = () + _TRACE_HOST_SUFFIXES = ( + "chatgpt.com", + "openai.com", + "gemini.google.com", + "google.com", + "cloudflare.com", + "challenges.cloudflare.com", + "turnstile", + ) + def __init__(self, config: dict): self.host = config.get("listen_host", "127.0.0.1") self.port = config.get("listen_port", 8080) + self.socks_enabled = config.get("socks5_enabled", True) + self.socks_host = config.get("socks5_host", self.host) + self.socks_port = config.get("socks5_port", 1080) self.mode = config.get("mode", "domain_fronting") self.fronter = DomainFronter(config) self.mitm = None self._cache = ResponseCache(max_mb=50) + self._direct_fail_until: dict[str, float] = {} # Persistent HTTP tunnel cache for google_fronting mode # Key: "host:port" → (tunnel_reader, tunnel_writer, lock) @@ -117,6 +164,22 @@ class ProxyServer: # hosts override — DNS fake-map: domain/suffix → IP # Checked before any real DNS lookup; supports exact and suffix matching. self._hosts: dict[str, str] = config.get("hosts", {}) + configured_direct_exclude = config.get("direct_google_exclude", []) + self._direct_google_exclude = { + h.lower().rstrip(".") + for h in ( + list(self._GOOGLE_DIRECT_EXACT_EXCLUDE) + + list(configured_direct_exclude) + ) + } + configured_direct_allow = config.get("direct_google_allow", []) + self._direct_google_allow = { + h.lower().rstrip(".") + for h in ( + list(self._GOOGLE_DIRECT_ALLOW_EXACT) + + list(configured_direct_allow) + ) + } if self.mode == "apps_script": try: @@ -127,14 +190,94 @@ class ProxyServer: log.error("Run: pip install cryptography") raise SystemExit(1) + @staticmethod + def _header_value(headers: dict | None, name: str) -> str: + if not headers: + return "" + for key, value in headers.items(): + if key.lower() == name: + return str(value) + return "" + + def _cache_allowed(self, method: str, url: str, + headers: dict | None, body: bytes) -> bool: + if method.upper() != "GET" or body: + return False + for name in ( + "cookie", "authorization", "proxy-authorization", "range", + "if-none-match", "if-modified-since", "cache-control", "pragma", + ): + if self._header_value(headers, name): + return False + return self.fronter._is_static_asset_url(url) + + @classmethod + def _should_trace_host(cls, host: str) -> bool: + h = host.lower().rstrip(".") + return any( + token == h or token in h or h.endswith("." + token) + for token in cls._TRACE_HOST_SUFFIXES + ) + + def _log_response_summary(self, url: str, response: bytes): + status, headers, body = self.fronter._split_raw_response(response) + host = (urlparse(url).hostname or "").lower() + if status >= 300 or self._should_trace_host(host): + location = headers.get("location", "") + server = headers.get("server", "") + cf_ray = headers.get("cf-ray", "") + content_type = headers.get("content-type", "") + body_len = len(body) + body_hint = "-" + if "text/html" in content_type.lower() and body: + sample = body[:800].decode(errors="replace").lower() + if "" in sample and "" in sample: + title = sample.split("", 1)[1].split("", 1)[0] + body_hint = title[:120] + elif "captcha" in sample: + body_hint = "captcha" + elif "turnstile" in sample: + body_hint = "turnstile" + elif "loading" in sample: + body_hint = "loading" + log.info( + "RESP ← %s status=%s type=%s len=%s server=%s location=%s cf-ray=%s hint=%s", + host or url[:60], status, content_type or "-", body_len, + server or "-", location or "-", cf_ray or "-", body_hint, + ) + async def start(self): - srv = await asyncio.start_server(self._on_client, self.host, self.port) + http_srv = await asyncio.start_server(self._on_client, self.host, self.port) + socks_srv = None + + if self.socks_enabled: + try: + socks_srv = await asyncio.start_server( + self._on_socks_client, self.socks_host, self.socks_port + ) + except OSError as e: + log.error("SOCKS5 listener failed on %s:%d: %s", + self.socks_host, self.socks_port, e) + log.info( - "Listening on %s:%d — configure your browser HTTP proxy to this address", + "HTTP proxy listening on %s:%d", self.host, self.port, ) - async with srv: - await srv.serve_forever() + if socks_srv: + log.info( + "SOCKS5 proxy listening on %s:%d", + self.socks_host, self.socks_port, + ) + + async with http_srv: + if socks_srv: + async with socks_srv: + await asyncio.gather( + http_srv.serve_forever(), + socks_srv.serve_forever(), + ) + else: + await http_srv.serve_forever() # ── client handler ──────────────────────────────────────────── @@ -176,6 +319,69 @@ class ProxyServer: except Exception: pass + async def _on_socks_client(self, reader: asyncio.StreamReader, + writer: asyncio.StreamWriter): + addr = writer.get_extra_info("peername") + try: + header = await asyncio.wait_for(reader.readexactly(2), timeout=15) + ver, nmethods = header[0], header[1] + if ver != 5: + return + + methods = await asyncio.wait_for(reader.readexactly(nmethods), timeout=10) + if 0x00 not in methods: + writer.write(b"\x05\xff") + await writer.drain() + return + + writer.write(b"\x05\x00") + await writer.drain() + + req = await asyncio.wait_for(reader.readexactly(4), timeout=15) + ver, cmd, _rsv, atyp = req + if ver != 5 or cmd != 0x01: + writer.write(b"\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00") + await writer.drain() + return + + if atyp == 0x01: + raw = await asyncio.wait_for(reader.readexactly(4), timeout=10) + host = socket.inet_ntoa(raw) + elif atyp == 0x03: + ln = (await asyncio.wait_for(reader.readexactly(1), timeout=10))[0] + host = (await asyncio.wait_for(reader.readexactly(ln), timeout=10)).decode( + errors="replace" + ) + elif atyp == 0x04: + raw = await asyncio.wait_for(reader.readexactly(16), timeout=10) + host = socket.inet_ntop(socket.AF_INET6, raw) + else: + writer.write(b"\x05\x08\x00\x01\x00\x00\x00\x00\x00\x00") + await writer.drain() + return + + port_raw = await asyncio.wait_for(reader.readexactly(2), timeout=10) + port = int.from_bytes(port_raw, "big") + + log.info("SOCKS5 CONNECT → %s:%d", host, port) + + writer.write(b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00") + await writer.drain() + await self._handle_target_tunnel(host, port, reader, writer) + + except asyncio.IncompleteReadError: + pass + except asyncio.TimeoutError: + log.debug("SOCKS5 timeout: %s", addr) + except Exception as e: + log.error("SOCKS5 error (%s): %s", addr, e) + finally: + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + # ── CONNECT (HTTPS tunnelling) ──────────────────────────────── async def _do_connect(self, target: str, reader, writer): @@ -189,6 +395,13 @@ class ProxyServer: writer.write(b"HTTP/1.1 200 Connection Established\r\n\r\n") await writer.drain() + await self._handle_target_tunnel(host, port, reader, writer) + + async def _handle_target_tunnel(self, host: str, port: int, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter): + """Route a target connection through the active relay mode.""" + if self.mode == "apps_script": override_ip = self._sni_rewrite_ip(host) if override_ip: @@ -200,10 +413,29 @@ class ProxyServer: await self._do_sni_rewrite_tunnel(host, port, reader, writer, connect_ip=override_ip) elif self._is_google_domain(host): + if self._direct_temporarily_disabled(host): + log.info("Relay fallback → %s (direct tunnel temporarily disabled)", host) + if port == 443: + await self._do_mitm_connect(host, port, reader, writer) + else: + await self._do_plain_http_tunnel(host, port, reader, writer) + return + log.info("Direct tunnel → %s (Google domain, skipping relay)", host) - await self._do_direct_tunnel(host, port, reader, writer) - else: + ok = await self._do_direct_tunnel(host, port, reader, writer) + if ok: + return + + self._remember_direct_failure(host) + log.warning("Direct tunnel fallback → %s (switching to relay)", host) + if port == 443: + await self._do_mitm_connect(host, port, reader, writer) + else: + await self._do_plain_http_tunnel(host, port, reader, writer) + elif port == 443: await self._do_mitm_connect(host, port, reader, writer) + else: + await self._do_plain_http_tunnel(host, port, reader, writer) else: await self.fronter.tunnel(host, port, reader, writer) @@ -271,25 +503,132 @@ class ProxyServer: # Only domains whose SNI the ISP does NOT block — direct tunnel is safe. # YouTube/googlevideo SNIs are blocked; they go through _do_sni_rewrite_tunnel # via the hosts map instead. - _GOOGLE_SUFFIXES = ( + _GOOGLE_OWNED_SUFFIXES = ( ".google.com", ".google.co", ".googleapis.com", ".gstatic.com", ".googleusercontent.com", ) - _GOOGLE_EXACT = { + _GOOGLE_OWNED_EXACT = { "google.com", "gstatic.com", "googleapis.com", } def _is_google_domain(self, host: str) -> bool: - """Return True if host is a Google-owned domain.""" + """Return True if host should use the raw direct Google shortcut.""" h = host.lower().rstrip(".") - if h in self._GOOGLE_EXACT: + if self._is_direct_google_excluded(h): + return False + if not self._is_google_owned_domain(h): + return False + return self._is_direct_google_allowed(h) + + def _is_google_owned_domain(self, host: str) -> bool: + if host in self._GOOGLE_OWNED_EXACT: return True - for suffix in self._GOOGLE_SUFFIXES: - if h.endswith(suffix): + for suffix in self._GOOGLE_OWNED_SUFFIXES: + if host.endswith(suffix): return True return False + def _is_direct_google_excluded(self, host: str) -> bool: + if host in self._direct_google_exclude: + return True + for suffix in self._GOOGLE_DIRECT_SUFFIX_EXCLUDE: + if host.endswith(suffix): + return True + for token in self._direct_google_exclude: + if token.startswith(".") and host.endswith(token): + return True + return False + + def _is_direct_google_allowed(self, host: str) -> bool: + if host in self._direct_google_allow: + return True + for suffix in self._GOOGLE_DIRECT_ALLOW_SUFFIXES: + if host.endswith(suffix): + return True + for token in self._direct_google_allow: + if token.startswith(".") and host.endswith(token): + return True + return False + + def _direct_temporarily_disabled(self, host: str) -> bool: + h = host.lower().rstrip(".") + now = time.time() + disabled = False + for key in self._direct_failure_keys(h): + until = self._direct_fail_until.get(key, 0) + if until > now: + disabled = True + else: + self._direct_fail_until.pop(key, None) + return disabled + + def _remember_direct_failure(self, host: str, ttl: int = 600): + until = time.time() + ttl + for key in self._direct_failure_keys(host.lower().rstrip(".")): + self._direct_fail_until[key] = until + + def _direct_failure_keys(self, host: str) -> tuple[str, ...]: + keys = [host] + if host.endswith(".google.com") or host == "google.com": + keys.append("*.google.com") + if host.endswith(".googleapis.com") or host == "googleapis.com": + keys.append("*.googleapis.com") + if host.endswith(".gstatic.com") or host == "gstatic.com": + keys.append("*.gstatic.com") + if host.endswith(".googleusercontent.com") or host == "googleusercontent.com": + keys.append("*.googleusercontent.com") + return tuple(dict.fromkeys(keys)) + + async def _open_tcp_connection(self, target: str, port: int, + timeout: float = 10.0): + """Connect with IPv4-first resolution and clearer failure reporting.""" + errors: list[str] = [] + loop = asyncio.get_running_loop() + + try: + ipaddress.ip_address(target) + candidates = [(0, target)] + except ValueError: + try: + infos = await asyncio.wait_for( + loop.getaddrinfo( + target, + port, + family=socket.AF_UNSPEC, + type=socket.SOCK_STREAM, + ), + timeout=timeout, + ) + except Exception as exc: + raise OSError(f"dns lookup failed for {target}: {exc!r}") from exc + + candidates = [] + seen = set() + for family, _type, _proto, _canon, sockaddr in infos: + ip = sockaddr[0] + key = (family, ip) + if key in seen: + continue + seen.add(key) + candidates.append((family, ip)) + + candidates.sort(key=lambda item: 0 if item[0] == socket.AF_INET else 1) + + for family, ip in candidates: + try: + return await asyncio.wait_for( + asyncio.open_connection(ip, port, family=family or 0), + timeout=timeout, + ) + except Exception as exc: + fam = "ipv4" if family == socket.AF_INET else ( + "ipv6" if family == socket.AF_INET6 else "auto" + ) + errors.append(f"{ip} ({fam}): {exc!r}") + + raise OSError("; ".join(errors) or f"connect failed for {target}:{port}") + # ── Direct tunnel (no MITM) ─────────────────────────────────── async def _do_direct_tunnel(self, host: str, port: int, @@ -300,17 +639,17 @@ class ProxyServer: connect_ip overrides DNS: the TCP connection goes to that IP while the browser's TLS (SNI=host) is piped through unchanged. - Defaults to the configured google_ip for Google-category domains. + Without an override we connect to the real hostname so browser-safe + Google properties (Gemini assets, Play, Accounts, etc.) use their + normal edge instead of being forced onto the fronting IP. """ - target_ip = connect_ip or self.fronter.connect_host + target_ip = connect_ip or host try: - r_remote, w_remote = await asyncio.wait_for( - asyncio.open_connection(target_ip, port), timeout=10 - ) + r_remote, w_remote = await self._open_tcp_connection(target_ip, port, timeout=10) except Exception as e: log.error("Direct tunnel connect failed (%s via %s): %s", host, target_ip, e) - return + return False async def pipe(src, dst, label): try: @@ -334,6 +673,7 @@ class ProxyServer: pipe(reader, w_remote, f"client→{host}"), pipe(r_remote, writer, f"{host}→client"), ) + return True # ── SNI-rewrite tunnel ──────────────────────────────────────── @@ -407,6 +747,11 @@ class ProxyServer: # ── MITM CONNECT (apps_script mode) ─────────────────────────── + async def _do_plain_http_tunnel(self, host: str, port: int, reader, writer): + """Handle plain HTTP over SOCKS5 in apps_script mode.""" + log.info("Plain HTTP relay → %s:%d", host, port) + await self._relay_http_stream(host, port, reader, writer) + async def _do_mitm_connect(self, host: str, port: int, reader, writer): """Intercept TLS, decrypt HTTP, and relay through Apps Script.""" ssl_ctx = self.mitm.get_server_context(host) @@ -433,6 +778,10 @@ class ProxyServer: # Update writer to use the new TLS transport writer._transport = new_transport + await self._relay_http_stream(host, port, reader, writer) + + async def _relay_http_stream(self, host: str, port: int, reader, writer): + """Read decrypted/origin-form HTTP requests and relay them.""" # Read and relay HTTP requests from the browser (now decrypted) while True: try: @@ -471,11 +820,16 @@ class ProxyServer: k, v = raw_line.decode(errors="replace").split(":", 1) headers[k.strip()] = v.strip() - # Build full URL (browser sends just the path in CONNECT) - if port == 443: + # MITM traffic arrives as origin-form paths; SOCKS/plain HTTP can + # also send absolute-form requests. Normalize both to full URLs. + if path.startswith("http://") or path.startswith("https://"): + url = path + elif port == 443: url = f"https://{host}{path}" + elif port == 80: + url = f"http://{host}{path}" else: - url = f"https://{host}:{port}{path}" + url = f"http://{host}:{port}{path}" log.info("MITM → %s %s", method, url) @@ -502,7 +856,7 @@ class ProxyServer: # Check local cache first (GET only) response = None - if method == "GET" and not body: + if self._cache_allowed(method, url, headers, body): response = self._cache.get(url) if response: log.debug("Cache HIT: %s", url[:60]) @@ -522,7 +876,7 @@ class ProxyServer: ) # Cache successful GET responses - if method == "GET" and not body and response: + if self._cache_allowed(method, url, headers, body) and response: ttl = ResponseCache.parse_ttl(response, url) if ttl > 0: self._cache.put(url, response, ttl) @@ -533,6 +887,8 @@ class ProxyServer: if origin and response: response = self._inject_cors_headers(response, origin) + self._log_response_summary(url, response) + writer.write(response) await writer.drain() @@ -692,7 +1048,7 @@ class ProxyServer: # Cache check for GET response = None - if method == "GET" and not body: + if self._cache_allowed(method, url, headers, body): response = self._cache.get(url) if response: log.debug("Cache HIT (HTTP): %s", url[:60]) @@ -700,7 +1056,7 @@ class ProxyServer: if response is None: response = await self._relay_smart(method, url, headers, body) # Cache successful GET - if method == "GET" and not body and response: + if self._cache_allowed(method, url, headers, body) and response: ttl = ResponseCache.parse_ttl(response, url) if ttl > 0: self._cache.put(url, response, ttl) @@ -708,6 +1064,7 @@ class ProxyServer: # Inject CORS headers for cross-origin requests if origin and response: response = self._inject_cors_headers(response, origin) + self._log_response_summary(url, response) elif self.mode in ("google_fronting", "custom_domain", "domain_fronting"): # Use WebSocket tunnel for ALL traffic (much faster than forward()) response = await self._tunnel_http(header_block, body)