From e00873557ab07ff28dcf62b86ff644010b06d4f5 Mon Sep 17 00:00:00 2001 From: Mohammad Amin jahani Date: Sun, 10 May 2026 00:49:08 +0300 Subject: [PATCH] rotate script IDs when one returns a bad response --- src/domain_fronter.py | 162 ++++++++++++++++++++++++++++++++---------- 1 file changed, 125 insertions(+), 37 deletions(-) diff --git a/src/domain_fronter.py b/src/domain_fronter.py index d59a738..19c5071 100644 --- a/src/domain_fronter.py +++ b/src/domain_fronter.py @@ -60,6 +60,10 @@ class HostStat: errors: int = 0 +class _RelayBadResponse(Exception): + """Raised when a relay response indicates the chosen script ID is unhealthy.""" + + def _build_sni_pool(front_domain: str, overrides: list | None) -> list[str]: """Build the list of SNIs to rotate through on new outbound TLS handshakes. @@ -401,6 +405,16 @@ class DomainFronter: if force or until <= now: self._sid_blacklist.pop(sid, None) + def _next_alt_sid(self, tried: set[str]) -> str | None: + """Pick a script ID not already tried and not blacklisted, or None.""" + for sid in self._script_ids: + if sid in tried: + continue + if self._is_sid_blacklisted(sid): + continue + return sid + return None + def _pick_fanout_sids(self, key: str | None) -> list[str]: """Pick up to `parallel_relay` distinct non-blacklisted script IDs. @@ -842,8 +856,18 @@ class DomainFronter: {"m": "HEAD", "u": "http://example.com/", "k": self.auth_key} ).encode() hdrs = {"content-type": "application/json"} - sid = self._script_ids[0] + for sid in list(self._script_ids): + if self._is_sid_blacklisted(sid): + continue + if await self._prewarm_one_sid(sid, payload, hdrs): + return + self._blacklist_sid(sid, reason="prewarm") + log.debug("Pre-warm exhausted all script IDs") + + async def _prewarm_one_sid(self, sid: str, payload: bytes, + hdrs: dict) -> bool: + """Try /dev fast-path detection then /exec warmup for one sid.""" # Test /dev endpoint — returns data inline (no 302 redirect). # If it works, saves ~400ms per request by eliminating one round trip. try: @@ -857,19 +881,21 @@ class DomainFronter: timeout=15, ) dt = (time.perf_counter() - t0) * 1000 - data = json.loads(body.decode(errors="replace")) - if "s" in data: - self._dev_available = True - log.info("/dev fast path active (%.0fms, no redirect)", dt) - return + if status == 200: + data = json.loads(body.decode(errors="replace")) + if "s" in data: + self._dev_available = True + log.info("/dev fast path active (%.0fms, no redirect)", dt) + return True except Exception as e: - log.debug("/dev test failed: %s", e) + log.debug("/dev test failed for sid %s: %s", + sid[-8:] if len(sid) > 8 else sid, e) # Fallback: warm up with /exec try: exec_path = f"/macros/s/{sid}/exec" t0 = time.perf_counter() - await asyncio.wait_for( + status, _, _ = await asyncio.wait_for( self._h2.request( method="POST", path=exec_path, host=self.http_host, headers=hdrs, body=payload, @@ -877,9 +903,16 @@ class DomainFronter: timeout=15, ) dt = (time.perf_counter() - t0) * 1000 + if status != 200: + log.debug("Pre-warm /exec returned %d for sid %s", + status, sid[-8:] if len(sid) > 8 else sid) + return False log.info("Apps Script pre-warmed in %.0fms", dt) + return True except Exception as e: - log.debug("Pre-warm failed: %s", e) + log.debug("Pre-warm failed for sid %s: %s", + sid[-8:] if len(sid) > 8 else sid, e) + return False async def _keepalive_loop(self): """Send periodic pings to keep Apps Script warm + H2 connection alive.""" @@ -1665,6 +1698,15 @@ class DomainFronter: async def _relay_with_retry(self, payload: dict) -> bytes: """Single relay with one retry on failure. Uses H2 if available.""" attempts = self._retry_attempts_for_payload(payload) + host_key = self._host_key(payload.get("u")) + tried_sids: set[str] = set() + + def pick_sid() -> str: + if not tried_sids: + return self._script_id_for_key(host_key) + alt = self._next_alt_sid(tried_sids) + return alt if alt is not None else self._script_id_for_key(host_key) + # Fan-out: race N Apps Script instances when enabled and H2 is up. # Cuts tail latency when one container is slow/cold. Only kicks in # if multiple script IDs are configured and the H2 transport is live. @@ -1686,12 +1728,23 @@ class DomainFronter: # Try HTTP/2 first — much faster (multiplexed, no pool checkout) if self._h2_available(): for attempt in range(attempts): + sid = pick_sid() + tried_sids.add(sid) try: result = await asyncio.wait_for( - self._relay_single_h2(payload), timeout=self._relay_timeout + self._relay_single_h2(payload, sid=sid), + timeout=self._relay_timeout, ) self._record_h2_success() return result + except _RelayBadResponse as e: + self._blacklist_sid(sid, reason=str(e)[:40]) + if (attempt < attempts - 1 + and self._next_alt_sid(tried_sids) is not None): + log.debug("H2 sid %s bad (%s), rotating", + sid[-8:] if len(sid) > 8 else sid, e) + continue + raise except Exception as e: self._record_h2_failure(e) if attempt < attempts - 1: @@ -1716,10 +1769,21 @@ class DomainFronter: # HTTP/1.1 fallback (pool-based) async with self._semaphore: for attempt in range(attempts): + sid = pick_sid() + tried_sids.add(sid) try: return await asyncio.wait_for( - self._relay_single(payload), timeout=self._relay_timeout + self._relay_single(payload, sid=sid), + timeout=self._relay_timeout, ) + except _RelayBadResponse as e: + self._blacklist_sid(sid, reason=str(e)[:40]) + if (attempt < attempts - 1 + and self._next_alt_sid(tried_sids) is not None): + log.debug("H1 sid %s bad (%s), rotating", + sid[-8:] if len(sid) > 8 else sid, e) + continue + raise except Exception as e: if attempt < attempts - 1: log.debug("Relay attempt %d failed (%s: %s), retrying", @@ -1776,33 +1840,15 @@ class DomainFronter: if pending: await asyncio.gather(*pending, return_exceptions=True) - async def _relay_single_h2(self, payload: dict) -> bytes: + async def _relay_single_h2(self, payload: dict, + sid: str | None = None) -> bytes: """Execute a relay through HTTP/2 multiplexing. Uses the shared H2 connection — no pool checkout needed. Many concurrent calls all share one TLS connection. """ - full_payload = dict(payload) - full_payload["k"] = self.auth_key - json_body = json.dumps(full_payload).encode() - - path = self._exec_path(payload.get("u")) - - status, headers, body = await self._h2.request( - method="POST", path=path, host=self.http_host, - headers={"content-type": "application/json"}, - body=json_body, - ) - - return self._parse_relay_response(body) - - async def _relay_single_h2_with_sid(self, payload: dict, - sid: str) -> bytes: - """Execute an H2 relay pinned to a specific Apps Script deployment. - - Used by `_relay_fanout` to race multiple script IDs in parallel. - Mirrors `_relay_single_h2` but ignores the stable-hash routing. - """ + if sid is None: + sid = self._script_id_for_key(self._host_key(payload.get("u"))) full_payload = dict(payload) full_payload["k"] = self.auth_key json_body = json.dumps(full_payload).encode() @@ -1815,16 +1861,32 @@ class DomainFronter: body=json_body, ) - return self._parse_relay_response(body) + if status != 200: + raise _RelayBadResponse( + f"upstream HTTP {status} from script " + f"{sid[-8:] if len(sid) > 8 else sid}", + ) + return self._parse_or_raise(body) - async def _relay_single(self, payload: dict) -> bytes: + async def _relay_single_h2_with_sid(self, payload: dict, + sid: str) -> bytes: + """Execute an H2 relay pinned to a specific Apps Script deployment. + + Used by `_relay_fanout` to race multiple script IDs in parallel. + """ + return await self._relay_single_h2(payload, sid=sid) + + async def _relay_single(self, payload: dict, + sid: str | None = None) -> bytes: """Execute a single relay POST → redirect → parse.""" # Add auth key + if sid is None: + sid = self._script_id_for_key(self._host_key(payload.get("u"))) full_payload = dict(payload) full_payload["k"] = self.auth_key json_body = json.dumps(full_payload).encode() - path = self._exec_path(payload.get("u")) + path = self._exec_path_for_sid(sid) reader, writer, created = await self._acquire() try: @@ -1872,7 +1934,12 @@ class DomainFronter: status, resp_headers, resp_body = await self._read_http_response(reader) await self._release(reader, writer, created) - return self._parse_relay_response(resp_body) + if status != 200: + raise _RelayBadResponse( + f"upstream HTTP {status} from script " + f"{sid[-8:] if len(sid) > 8 else sid}", + ) + return self._parse_or_raise(resp_body) except Exception: try: @@ -2136,6 +2203,27 @@ class DomainFronter: return self._parse_relay_json(data) + def _parse_or_raise(self, body: bytes) -> bytes: + """Like `_parse_relay_response` but raises `_RelayBadResponse` on failure.""" + text = body.decode(errors="replace").strip() + if not text: + raise _RelayBadResponse("empty response") + + try: + data = json.loads(text) + except json.JSONDecodeError: + m = re.search(r'\{.*\}', text, re.DOTALL) + if not m: + raise _RelayBadResponse(f"non-JSON: {text[:120]}") + try: + data = json.loads(m.group()) + except json.JSONDecodeError: + raise _RelayBadResponse(f"bad JSON: {text[:120]}") + + if "e" in data: + raise _RelayBadResponse(f"relay error: {data['e']}") + return self._parse_relay_json(data) + def _parse_relay_json(self, data: dict) -> bytes: """Convert a parsed relay JSON dict to raw HTTP response bytes.""" if "e" in data: