mirror of
https://github.com/denuitt1/mhr-cfw.git
synced 2026-05-17 21:24:36 +03:00
Merge pull request #157 from onlymaj/fix/multi-script-broken-id-blacklist
fix(relay): rotate script IDs when one returns a bad response
This commit is contained in:
+125
-37
@@ -60,6 +60,10 @@ class HostStat:
|
||||
errors: int = 0
|
||||
|
||||
|
||||
class _RelayBadResponse(Exception):
|
||||
"""Raised when a relay response indicates the chosen script ID is unhealthy."""
|
||||
|
||||
|
||||
def _build_sni_pool(front_domain: str, overrides: list | None) -> list[str]:
|
||||
"""Build the list of SNIs to rotate through on new outbound TLS handshakes.
|
||||
|
||||
@@ -401,6 +405,16 @@ class DomainFronter:
|
||||
if force or until <= now:
|
||||
self._sid_blacklist.pop(sid, None)
|
||||
|
||||
def _next_alt_sid(self, tried: set[str]) -> str | None:
|
||||
"""Pick a script ID not already tried and not blacklisted, or None."""
|
||||
for sid in self._script_ids:
|
||||
if sid in tried:
|
||||
continue
|
||||
if self._is_sid_blacklisted(sid):
|
||||
continue
|
||||
return sid
|
||||
return None
|
||||
|
||||
def _pick_fanout_sids(self, key: str | None) -> list[str]:
|
||||
"""Pick up to `parallel_relay` distinct non-blacklisted script IDs.
|
||||
|
||||
@@ -842,8 +856,18 @@ class DomainFronter:
|
||||
{"m": "HEAD", "u": "http://example.com/", "k": self.auth_key}
|
||||
).encode()
|
||||
hdrs = {"content-type": "application/json"}
|
||||
sid = self._script_ids[0]
|
||||
|
||||
for sid in list(self._script_ids):
|
||||
if self._is_sid_blacklisted(sid):
|
||||
continue
|
||||
if await self._prewarm_one_sid(sid, payload, hdrs):
|
||||
return
|
||||
self._blacklist_sid(sid, reason="prewarm")
|
||||
log.debug("Pre-warm exhausted all script IDs")
|
||||
|
||||
async def _prewarm_one_sid(self, sid: str, payload: bytes,
|
||||
hdrs: dict) -> bool:
|
||||
"""Try /dev fast-path detection then /exec warmup for one sid."""
|
||||
# Test /dev endpoint — returns data inline (no 302 redirect).
|
||||
# If it works, saves ~400ms per request by eliminating one round trip.
|
||||
try:
|
||||
@@ -857,19 +881,21 @@ class DomainFronter:
|
||||
timeout=15,
|
||||
)
|
||||
dt = (time.perf_counter() - t0) * 1000
|
||||
data = json.loads(body.decode(errors="replace"))
|
||||
if "s" in data:
|
||||
self._dev_available = True
|
||||
log.info("/dev fast path active (%.0fms, no redirect)", dt)
|
||||
return
|
||||
if status == 200:
|
||||
data = json.loads(body.decode(errors="replace"))
|
||||
if "s" in data:
|
||||
self._dev_available = True
|
||||
log.info("/dev fast path active (%.0fms, no redirect)", dt)
|
||||
return True
|
||||
except Exception as e:
|
||||
log.debug("/dev test failed: %s", e)
|
||||
log.debug("/dev test failed for sid %s: %s",
|
||||
sid[-8:] if len(sid) > 8 else sid, e)
|
||||
|
||||
# Fallback: warm up with /exec
|
||||
try:
|
||||
exec_path = f"/macros/s/{sid}/exec"
|
||||
t0 = time.perf_counter()
|
||||
await asyncio.wait_for(
|
||||
status, _, _ = await asyncio.wait_for(
|
||||
self._h2.request(
|
||||
method="POST", path=exec_path, host=self.http_host,
|
||||
headers=hdrs, body=payload,
|
||||
@@ -877,9 +903,16 @@ class DomainFronter:
|
||||
timeout=15,
|
||||
)
|
||||
dt = (time.perf_counter() - t0) * 1000
|
||||
if status != 200:
|
||||
log.debug("Pre-warm /exec returned %d for sid %s",
|
||||
status, sid[-8:] if len(sid) > 8 else sid)
|
||||
return False
|
||||
log.info("Apps Script pre-warmed in %.0fms", dt)
|
||||
return True
|
||||
except Exception as e:
|
||||
log.debug("Pre-warm failed: %s", e)
|
||||
log.debug("Pre-warm failed for sid %s: %s",
|
||||
sid[-8:] if len(sid) > 8 else sid, e)
|
||||
return False
|
||||
|
||||
async def _keepalive_loop(self):
|
||||
"""Send periodic pings to keep Apps Script warm + H2 connection alive."""
|
||||
@@ -1665,6 +1698,15 @@ class DomainFronter:
|
||||
async def _relay_with_retry(self, payload: dict) -> bytes:
|
||||
"""Single relay with one retry on failure. Uses H2 if available."""
|
||||
attempts = self._retry_attempts_for_payload(payload)
|
||||
host_key = self._host_key(payload.get("u"))
|
||||
tried_sids: set[str] = set()
|
||||
|
||||
def pick_sid() -> str:
|
||||
if not tried_sids:
|
||||
return self._script_id_for_key(host_key)
|
||||
alt = self._next_alt_sid(tried_sids)
|
||||
return alt if alt is not None else self._script_id_for_key(host_key)
|
||||
|
||||
# Fan-out: race N Apps Script instances when enabled and H2 is up.
|
||||
# Cuts tail latency when one container is slow/cold. Only kicks in
|
||||
# if multiple script IDs are configured and the H2 transport is live.
|
||||
@@ -1686,12 +1728,23 @@ class DomainFronter:
|
||||
# Try HTTP/2 first — much faster (multiplexed, no pool checkout)
|
||||
if self._h2_available():
|
||||
for attempt in range(attempts):
|
||||
sid = pick_sid()
|
||||
tried_sids.add(sid)
|
||||
try:
|
||||
result = await asyncio.wait_for(
|
||||
self._relay_single_h2(payload), timeout=self._relay_timeout
|
||||
self._relay_single_h2(payload, sid=sid),
|
||||
timeout=self._relay_timeout,
|
||||
)
|
||||
self._record_h2_success()
|
||||
return result
|
||||
except _RelayBadResponse as e:
|
||||
self._blacklist_sid(sid, reason=str(e)[:40])
|
||||
if (attempt < attempts - 1
|
||||
and self._next_alt_sid(tried_sids) is not None):
|
||||
log.debug("H2 sid %s bad (%s), rotating",
|
||||
sid[-8:] if len(sid) > 8 else sid, e)
|
||||
continue
|
||||
raise
|
||||
except Exception as e:
|
||||
self._record_h2_failure(e)
|
||||
if attempt < attempts - 1:
|
||||
@@ -1716,10 +1769,21 @@ class DomainFronter:
|
||||
# HTTP/1.1 fallback (pool-based)
|
||||
async with self._semaphore:
|
||||
for attempt in range(attempts):
|
||||
sid = pick_sid()
|
||||
tried_sids.add(sid)
|
||||
try:
|
||||
return await asyncio.wait_for(
|
||||
self._relay_single(payload), timeout=self._relay_timeout
|
||||
self._relay_single(payload, sid=sid),
|
||||
timeout=self._relay_timeout,
|
||||
)
|
||||
except _RelayBadResponse as e:
|
||||
self._blacklist_sid(sid, reason=str(e)[:40])
|
||||
if (attempt < attempts - 1
|
||||
and self._next_alt_sid(tried_sids) is not None):
|
||||
log.debug("H1 sid %s bad (%s), rotating",
|
||||
sid[-8:] if len(sid) > 8 else sid, e)
|
||||
continue
|
||||
raise
|
||||
except Exception as e:
|
||||
if attempt < attempts - 1:
|
||||
log.debug("Relay attempt %d failed (%s: %s), retrying",
|
||||
@@ -1776,33 +1840,15 @@ class DomainFronter:
|
||||
if pending:
|
||||
await asyncio.gather(*pending, return_exceptions=True)
|
||||
|
||||
async def _relay_single_h2(self, payload: dict) -> bytes:
|
||||
async def _relay_single_h2(self, payload: dict,
|
||||
sid: str | None = None) -> bytes:
|
||||
"""Execute a relay through HTTP/2 multiplexing.
|
||||
|
||||
Uses the shared H2 connection — no pool checkout needed.
|
||||
Many concurrent calls all share one TLS connection.
|
||||
"""
|
||||
full_payload = dict(payload)
|
||||
full_payload["k"] = self.auth_key
|
||||
json_body = json.dumps(full_payload).encode()
|
||||
|
||||
path = self._exec_path(payload.get("u"))
|
||||
|
||||
status, headers, body = await self._h2.request(
|
||||
method="POST", path=path, host=self.http_host,
|
||||
headers={"content-type": "application/json"},
|
||||
body=json_body,
|
||||
)
|
||||
|
||||
return self._parse_relay_response(body)
|
||||
|
||||
async def _relay_single_h2_with_sid(self, payload: dict,
|
||||
sid: str) -> bytes:
|
||||
"""Execute an H2 relay pinned to a specific Apps Script deployment.
|
||||
|
||||
Used by `_relay_fanout` to race multiple script IDs in parallel.
|
||||
Mirrors `_relay_single_h2` but ignores the stable-hash routing.
|
||||
"""
|
||||
if sid is None:
|
||||
sid = self._script_id_for_key(self._host_key(payload.get("u")))
|
||||
full_payload = dict(payload)
|
||||
full_payload["k"] = self.auth_key
|
||||
json_body = json.dumps(full_payload).encode()
|
||||
@@ -1815,16 +1861,32 @@ class DomainFronter:
|
||||
body=json_body,
|
||||
)
|
||||
|
||||
return self._parse_relay_response(body)
|
||||
if status != 200:
|
||||
raise _RelayBadResponse(
|
||||
f"upstream HTTP {status} from script "
|
||||
f"{sid[-8:] if len(sid) > 8 else sid}",
|
||||
)
|
||||
return self._parse_or_raise(body)
|
||||
|
||||
async def _relay_single(self, payload: dict) -> bytes:
|
||||
async def _relay_single_h2_with_sid(self, payload: dict,
|
||||
sid: str) -> bytes:
|
||||
"""Execute an H2 relay pinned to a specific Apps Script deployment.
|
||||
|
||||
Used by `_relay_fanout` to race multiple script IDs in parallel.
|
||||
"""
|
||||
return await self._relay_single_h2(payload, sid=sid)
|
||||
|
||||
async def _relay_single(self, payload: dict,
|
||||
sid: str | None = None) -> bytes:
|
||||
"""Execute a single relay POST → redirect → parse."""
|
||||
# Add auth key
|
||||
if sid is None:
|
||||
sid = self._script_id_for_key(self._host_key(payload.get("u")))
|
||||
full_payload = dict(payload)
|
||||
full_payload["k"] = self.auth_key
|
||||
json_body = json.dumps(full_payload).encode()
|
||||
|
||||
path = self._exec_path(payload.get("u"))
|
||||
path = self._exec_path_for_sid(sid)
|
||||
reader, writer, created = await self._acquire()
|
||||
|
||||
try:
|
||||
@@ -1872,7 +1934,12 @@ class DomainFronter:
|
||||
status, resp_headers, resp_body = await self._read_http_response(reader)
|
||||
|
||||
await self._release(reader, writer, created)
|
||||
return self._parse_relay_response(resp_body)
|
||||
if status != 200:
|
||||
raise _RelayBadResponse(
|
||||
f"upstream HTTP {status} from script "
|
||||
f"{sid[-8:] if len(sid) > 8 else sid}",
|
||||
)
|
||||
return self._parse_or_raise(resp_body)
|
||||
|
||||
except Exception:
|
||||
try:
|
||||
@@ -2136,6 +2203,27 @@ class DomainFronter:
|
||||
|
||||
return self._parse_relay_json(data)
|
||||
|
||||
def _parse_or_raise(self, body: bytes) -> bytes:
|
||||
"""Like `_parse_relay_response` but raises `_RelayBadResponse` on failure."""
|
||||
text = body.decode(errors="replace").strip()
|
||||
if not text:
|
||||
raise _RelayBadResponse("empty response")
|
||||
|
||||
try:
|
||||
data = json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
m = re.search(r'\{.*\}', text, re.DOTALL)
|
||||
if not m:
|
||||
raise _RelayBadResponse(f"non-JSON: {text[:120]}")
|
||||
try:
|
||||
data = json.loads(m.group())
|
||||
except json.JSONDecodeError:
|
||||
raise _RelayBadResponse(f"bad JSON: {text[:120]}")
|
||||
|
||||
if "e" in data:
|
||||
raise _RelayBadResponse(f"relay error: {data['e']}")
|
||||
return self._parse_relay_json(data)
|
||||
|
||||
def _parse_relay_json(self, data: dict) -> bytes:
|
||||
"""Convert a parsed relay JSON dict to raw HTTP response bytes."""
|
||||
if "e" in data:
|
||||
|
||||
Reference in New Issue
Block a user