diff --git a/src/core/logging_utils.py b/src/core/logging_utils.py index 96673c5..8f3df11 100644 --- a/src/core/logging_utils.py +++ b/src/core/logging_utils.py @@ -234,6 +234,21 @@ def configure(level: str = "INFO", *, stream=None) -> None: # stream in TLS via start_tls(); there's nothing actionable to do. _install_asyncio_noise_filter() + # Quiet very chatty third-party loggers even when the root logger is set + # to DEBUG: we only care about our own component DEBUG output. hpack + # emits one log line per header field, asyncio emits raw selector spam. + for noisy in ( + "hpack", + "hpack.hpack", + "hpack.table", + "h2", + "h2.connection", + "asyncio", + "urllib3", + "chardet", + ): + logging.getLogger(noisy).setLevel(logging.INFO) + class _AsyncioNoiseFilter(logging.Filter): _SUPPRESSED = ( diff --git a/src/relay/domain_fronter.py b/src/relay/domain_fronter.py index 92c41f7..a63f583 100644 --- a/src/relay/domain_fronter.py +++ b/src/relay/domain_fronter.py @@ -176,6 +176,10 @@ class DomainFronter: self._warmed = False self._refilling = False self._pool_min_idle = POOL_MIN_IDLE + # H1 is fallback-only when H2 is active. We don't know yet whether + # the H2 pool will succeed (set later in __init__), so default to the + # full warm count and let the H2 init below shrink it if applicable. + self._warm_count = WARM_POOL_COUNT self._maintenance_task: asyncio.Task | None = None self._keepalive_task: asyncio.Task | None = None self._warm_task: asyncio.Task | None = None @@ -209,6 +213,11 @@ class DomainFronter: self._coalesce: dict[str, list[asyncio.Future]] = {} self._h2_failure_streak = 0 self._h2_disabled_until = 0.0 + # When the H2 reader loop ends, EVERY in-flight stream raises a + # ConnectionError simultaneously. Without de-duping by connection + # generation, a single drop with 5+ in-flight streams trips the + # disable threshold and forces a 15s H1 fallback for no reason. + self._h2_last_failure_gen: int = -1 self._stream_download_disabled_until: dict[str, float] = {} # HTTP/2 multiplexing — pool of parallel connections for DPI bypass. @@ -244,6 +253,13 @@ class DomainFronter: "(each gets its own DPI token bucket)", n_conns, ) + # H1 is now fallback-only — shrink the pool we keep warm. + # We still want a few ready for instant fallback when H2 hits + # a transient failure (cooldown window), but maintaining 30 + # warm + 15 idle connections that are virtually never used + # wastes TLS handshakes and CPU. + self._warm_count = min(self._warm_count, 6) + self._pool_min_idle = min(self._pool_min_idle, 3) except ImportError: pass @@ -391,8 +407,35 @@ class DomainFronter: def _record_h2_success(self) -> None: self._h2_failure_streak = 0 + # Reset the generation guard so the *next* genuine drop is counted + # even if the connection happens to share its old generation key. + self._h2_last_failure_gen = -1 def _record_h2_failure(self, exc: Exception) -> None: + # De-dupe failures from a single connection drop event. When the + # H2 reader loop ends, every in-flight stream raises a transport + # error simultaneously — counting each as a separate failure trips + # the disable threshold from one drop with 5+ concurrent streams. + # Track failures per connection generation so a single drop counts + # at most once per H2 transport. + gen_key = -1 + try: + if self._h2_pool: + # Use the sum of generations across the pool as a proxy + # for "have any connections been re-established since the + # last failure?". Bumps once per reconnect. + gen_key = sum( + getattr(t, "_conn_generation", 0) for t in self._h2_pool + ) + elif self._h2 is not None: + gen_key = getattr(self._h2, "_conn_generation", 0) + except Exception: + gen_key = -1 + if gen_key == self._h2_last_failure_gen and gen_key != -1: + # Same drop event — already counted. + return + self._h2_last_failure_gen = gen_key + self._h2_failure_streak += 1 # Extend the cooldown window on every failure so a burst of concurrent # failures doesn't shorten the effective cooldown. @@ -1233,7 +1276,7 @@ class DomainFronter: async def _do_warm(self): """Open WARM_POOL_COUNT connections in parallel — failures are fine.""" await self._ensure_sni_ranked() - count = WARM_POOL_COUNT + count = self._warm_count coros = [self._add_conn_to_pool() for _ in range(count)] results = await asyncio.gather(*coros, return_exceptions=True) opened = sum(1 for r in results if not isinstance(r, Exception)) @@ -2085,8 +2128,16 @@ class DomainFronter: def _build_payload(self, method, url, headers, body): """Build the JSON relay payload dict.""" + # Apps Script's UrlFetchApp.fetch() does not accept HEAD or OPTIONS + # methods — passing either throws and the relay returns 502. Map + # them to GET on the wire (the upstream still gets the same response + # body, and HEAD-aware HTTP clients ignore the body anyway since + # they look at Content-Length / framing). This is the defensive + # mirror of the same normalisation done in Code.gs. + upper_method = method.upper() if method else "GET" + wire_method = "GET" if upper_method in ("HEAD", "OPTIONS") else method payload = { - "m": method, + "m": wire_method, "u": url, # Let the browser/app see origin redirects and cookies directly. "r": False, @@ -2375,13 +2426,28 @@ class DomainFronter: if not future.done(): future.set_result(result) except Exception as e: - log.warning( - "Batch relay failed, disabling batch mode for %ds cooldown. " - "Error: %s: %s", - self._batch_cooldown, type(e).__name__, e or "(no details)", - ) - self._batch_enabled = False - self._batch_disabled_at = time.time() + # Only globally disable batch mode for genuine failures (parse + # errors, protocol errors). A bare TimeoutError or transient + # connection drop is recoverable on the very next batch — keeping + # batch mode disabled for 60s while traffic floods (e.g. a Vercel + # marketing page with 200+ chunks) collapses every request into + # its own Apps Script execution and explodes quota usage. + transient = isinstance(e, (asyncio.TimeoutError, ConnectionError, + TimeoutError, OSError)) + if transient: + log.warning( + "Batch relay transient error (%s: %s) — falling back " + "individually but keeping batch mode enabled", + type(e).__name__, e or "(no details)", + ) + else: + log.warning( + "Batch relay failed, disabling batch mode for %ds cooldown. " + "Error: %s: %s", + self._batch_cooldown, type(e).__name__, e or "(no details)", + ) + self._batch_enabled = False + self._batch_disabled_at = time.time() # Fallback: send individually tasks = [] for payload, future in batch: @@ -2526,11 +2592,21 @@ class DomainFronter: path = self._exec_path_for_sid(sid) self._record_execution(sid) + t0 = time.perf_counter() status, headers, body = await (self._pick_h2() or self._h2).request( method="POST", path=path, host=self.http_host, headers={"content-type": "application/json"}, body=json_body, + timeout=self._relay_timeout, ) + if log.isEnabledFor(logging.DEBUG): + log.debug( + "H2 relay %s [%s] %.0fms (%d bytes)", + payload.get("m", "?"), + (payload.get("u") or "")[:60], + (time.perf_counter() - t0) * 1000.0, + len(body), + ) return parse_relay_response(body, self._max_response_body_bytes) @@ -2552,6 +2628,7 @@ class DomainFronter: method="POST", path=path, host=self.http_host, headers={"content-type": "application/json"}, body=json_body, + timeout=self._relay_timeout, ) return parse_relay_response(body, self._max_response_body_bytes) @@ -2654,18 +2731,33 @@ class DomainFronter: ) path = self._exec_path_for_sid(sid) - # Try HTTP/2 first + # Try HTTP/2 first. Use the configured relay_timeout: batches can + # carry the combined response of N requests so they need at least as + # much time as a single relay. A hardcoded 30s timed out legitimate + # large-asset bursts and forced batch mode into a 60s cooldown, + # collapsing every subsequent request into its own Apps Script + # execution (huge quota burn). if self._h2_available(): + batch_timeout = max(self._relay_timeout, 30.0) try: self._record_execution(sid) + t0 = time.perf_counter() status, headers, body = await asyncio.wait_for( (self._pick_h2() or self._h2).request( method="POST", path=path, host=self.http_host, headers={"content-type": "application/json"}, body=json_body, + timeout=batch_timeout, ), - timeout=30, + timeout=batch_timeout, ) + if log.isEnabledFor(logging.DEBUG): + log.debug( + "H2 batch %d items: %.0fms (%d bytes)", + len(payloads), + (time.perf_counter() - t0) * 1000.0, + len(body), + ) self._record_h2_success() return self._parse_batch_body(body, payloads) except Exception as e: diff --git a/src/relay/h2_transport.py b/src/relay/h2_transport.py index 77b7e0c..3d5a657 100644 --- a/src/relay/h2_transport.py +++ b/src/relay/h2_transport.py @@ -18,6 +18,7 @@ import asyncio import logging import socket import ssl +import time from urllib.parse import urlparse try: @@ -104,9 +105,18 @@ class H2Transport: # Per-stream tracking self._streams: dict[int, _StreamState] = {} + # Connection-level keepalive: ONE pinger task per connection, runs + # only while at least one stream is in flight. Replaces the previous + # per-stream ping task (N concurrent streams → N pingers each at + # ping_interval cadence → N×(1/interval) pings/sec, all serialised on + # _write_lock). With 20 in-flight streams and 0.2s interval that was + # ~100 PING/s contending with stream sends. + self._inflight_pinger_task: asyncio.Task | None = None + # Stats self.total_requests = 0 self.total_streams = 0 + self._ping_count = 0 # ── Connection lifecycle ────────────────────────────────────── @@ -246,6 +256,11 @@ class H2Transport: if read_task: read_task.cancel() await asyncio.gather(read_task, return_exceptions=True) + pinger_task = self._inflight_pinger_task + self._inflight_pinger_task = None + if pinger_task: + pinger_task.cancel() + await asyncio.gather(pinger_task, return_exceptions=True) if self._writer: try: writer = self._writer @@ -340,28 +355,17 @@ class H2Transport: await self._flush() - # Wait for complete response, sending H2 PING frames concurrently. - # The PING/ACK bidirectional traffic masks the Apps Script execution - # silence that Iran DPI uses to identify and throttle relay patterns. - ping_task = None - if self._ping_interval > 0: - ping_task = asyncio.create_task( - self._ping_keepalive(state, self._ping_interval) - ) + # Connection-level pinger: start if not running. ONE task pings the + # connection while ANY stream is in-flight. Replaces the previous + # per-stream pinger that produced O(N) pings/sec with N streams. + self._ensure_inflight_pinger() try: await asyncio.wait_for(state.done.wait(), timeout=timeout) except asyncio.TimeoutError: - if ping_task: - ping_task.cancel() - await asyncio.gather(ping_task, return_exceptions=True) self._streams.pop(stream_id, None) raise TimeoutError( f"H2 stream {stream_id} timed out ({timeout}s)" ) - finally: - if ping_task: - ping_task.cancel() - await asyncio.gather(ping_task, return_exceptions=True) self._streams.pop(stream_id, None) @@ -376,29 +380,62 @@ class H2Transport: return state.status, state.headers, resp_body - async def _ping_keepalive(self, state: "_StreamState", interval: float): - """Send H2 PING frames at *interval* seconds until *state* completes. + def _ensure_inflight_pinger(self) -> None: + """Start the connection-level inflight pinger if not already running. - Iran DPI throttles relay responses when it detects a long TCP silence + ONE task pings the connection at ping_interval while at least one + stream is in flight. Auto-exits when no streams remain so it + doesn't ping during idle periods (keepalive_loop handles those). + """ + if self._ping_interval <= 0: + return + if self._inflight_pinger_task and not self._inflight_pinger_task.done(): + return + self._inflight_pinger_task = asyncio.create_task( + self._inflight_pinger_loop() + ) + + async def _inflight_pinger_loop(self): + """Background: ping while any stream is in-flight. + + Iran DPI throttles relay responses when it detects long TCP silence between the client's POST upload and the server's first response byte - (Apps Script typically takes 2-5 s to execute). Sending PING frames + (Apps Script typically takes 2-5s to execute). Sending PING frames forces the server to emit PING ACK frames, creating continuous bidirectional traffic that looks like a normal persistent connection rather than a relay waiting for proxy execution. + + One pinger per connection regardless of in-flight stream count. """ - opaque = b"dpi-ping" # 8-byte PING opaque data + opaque = b"dpi-ping" + # Keep pinging for ~5s after the last stream finishes so Apps Script + # doesn't drop the connection during the brief gaps between bursts of + # requests (e.g. user navigating between pages). The full keepalive + # loop in domain_fronter pings only every 60s, which leaves a window + # where the server can send GOAWAY and the next request loses the race. + idle_grace_seconds = 5.0 + idle_since: float | None = None try: - while not state.done.is_set(): - await asyncio.sleep(interval) - if state.done.is_set(): + while True: + await asyncio.sleep(self._ping_interval) + if not self._connected or not self._h2: break + if not self._streams: + now = time.monotonic() + if idle_since is None: + idle_since = now + elif (now - idle_since) >= idle_grace_seconds: + break + else: + idle_since = None try: async with self._write_lock: if self._h2 and self._connected: self._h2.ping(opaque) await self._flush() + self._ping_count += 1 except Exception: - break # connection lost; let the main wait() handle it + break # connection lost; main wait() will surface error except asyncio.CancelledError: pass diff --git a/src/relay/relay_response.py b/src/relay/relay_response.py index 9498faf..901e094 100644 --- a/src/relay/relay_response.py +++ b/src/relay/relay_response.py @@ -268,6 +268,15 @@ def _decode_target_body(body: bytes, encoding: str) -> tuple[bytes, bool]: if not body or not encodings: return body, False + # Apps Script's UrlFetchApp transparently decompresses gzip/br/deflate + # responses but preserves the original Content-Encoding header in the + # forwarded metadata. The body therefore arrives as already-plain bytes + # while still being labelled (e.g.) "br". Detect that case up front so + # we don't pay the CPU cost of a guaranteed-failing brotli/zstd decode + # on every relayed response. + if _looks_like_plain_body(body): + return body, True + decoded = body for layer in reversed(encodings): before = decoded