Enhance logging and connection management in DomainFronter and H2Transport

- Suppress noisy third-party loggers in logging_utils.py
- Adjust warm connection count and pool management in DomainFronter
- Implement connection-level keepalive pinger in H2Transport
- Optimize response handling in relay_response.py for already-plain bodies
This commit is contained in:
Abolfazl
2026-05-09 19:32:15 +03:30
parent c258ae7c9e
commit 9022eee784
4 changed files with 188 additions and 35 deletions
+15
View File
@@ -234,6 +234,21 @@ def configure(level: str = "INFO", *, stream=None) -> None:
# stream in TLS via start_tls(); there's nothing actionable to do.
_install_asyncio_noise_filter()
# Quiet very chatty third-party loggers even when the root logger is set
# to DEBUG: we only care about our own component DEBUG output. hpack
# emits one log line per header field, asyncio emits raw selector spam.
for noisy in (
"hpack",
"hpack.hpack",
"hpack.table",
"h2",
"h2.connection",
"asyncio",
"urllib3",
"chardet",
):
logging.getLogger(noisy).setLevel(logging.INFO)
class _AsyncioNoiseFilter(logging.Filter):
_SUPPRESSED = (
+103 -11
View File
@@ -176,6 +176,10 @@ class DomainFronter:
self._warmed = False
self._refilling = False
self._pool_min_idle = POOL_MIN_IDLE
# H1 is fallback-only when H2 is active. We don't know yet whether
# the H2 pool will succeed (set later in __init__), so default to the
# full warm count and let the H2 init below shrink it if applicable.
self._warm_count = WARM_POOL_COUNT
self._maintenance_task: asyncio.Task | None = None
self._keepalive_task: asyncio.Task | None = None
self._warm_task: asyncio.Task | None = None
@@ -209,6 +213,11 @@ class DomainFronter:
self._coalesce: dict[str, list[asyncio.Future]] = {}
self._h2_failure_streak = 0
self._h2_disabled_until = 0.0
# When the H2 reader loop ends, EVERY in-flight stream raises a
# ConnectionError simultaneously. Without de-duping by connection
# generation, a single drop with 5+ in-flight streams trips the
# disable threshold and forces a 15s H1 fallback for no reason.
self._h2_last_failure_gen: int = -1
self._stream_download_disabled_until: dict[str, float] = {}
# HTTP/2 multiplexing — pool of parallel connections for DPI bypass.
@@ -244,6 +253,13 @@ class DomainFronter:
"(each gets its own DPI token bucket)",
n_conns,
)
# H1 is now fallback-only — shrink the pool we keep warm.
# We still want a few ready for instant fallback when H2 hits
# a transient failure (cooldown window), but maintaining 30
# warm + 15 idle connections that are virtually never used
# wastes TLS handshakes and CPU.
self._warm_count = min(self._warm_count, 6)
self._pool_min_idle = min(self._pool_min_idle, 3)
except ImportError:
pass
@@ -391,8 +407,35 @@ class DomainFronter:
def _record_h2_success(self) -> None:
self._h2_failure_streak = 0
# Reset the generation guard so the *next* genuine drop is counted
# even if the connection happens to share its old generation key.
self._h2_last_failure_gen = -1
def _record_h2_failure(self, exc: Exception) -> None:
# De-dupe failures from a single connection drop event. When the
# H2 reader loop ends, every in-flight stream raises a transport
# error simultaneously — counting each as a separate failure trips
# the disable threshold from one drop with 5+ concurrent streams.
# Track failures per connection generation so a single drop counts
# at most once per H2 transport.
gen_key = -1
try:
if self._h2_pool:
# Use the sum of generations across the pool as a proxy
# for "have any connections been re-established since the
# last failure?". Bumps once per reconnect.
gen_key = sum(
getattr(t, "_conn_generation", 0) for t in self._h2_pool
)
elif self._h2 is not None:
gen_key = getattr(self._h2, "_conn_generation", 0)
except Exception:
gen_key = -1
if gen_key == self._h2_last_failure_gen and gen_key != -1:
# Same drop event — already counted.
return
self._h2_last_failure_gen = gen_key
self._h2_failure_streak += 1
# Extend the cooldown window on every failure so a burst of concurrent
# failures doesn't shorten the effective cooldown.
@@ -1233,7 +1276,7 @@ class DomainFronter:
async def _do_warm(self):
"""Open WARM_POOL_COUNT connections in parallel — failures are fine."""
await self._ensure_sni_ranked()
count = WARM_POOL_COUNT
count = self._warm_count
coros = [self._add_conn_to_pool() for _ in range(count)]
results = await asyncio.gather(*coros, return_exceptions=True)
opened = sum(1 for r in results if not isinstance(r, Exception))
@@ -2085,8 +2128,16 @@ class DomainFronter:
def _build_payload(self, method, url, headers, body):
"""Build the JSON relay payload dict."""
# Apps Script's UrlFetchApp.fetch() does not accept HEAD or OPTIONS
# methods — passing either throws and the relay returns 502. Map
# them to GET on the wire (the upstream still gets the same response
# body, and HEAD-aware HTTP clients ignore the body anyway since
# they look at Content-Length / framing). This is the defensive
# mirror of the same normalisation done in Code.gs.
upper_method = method.upper() if method else "GET"
wire_method = "GET" if upper_method in ("HEAD", "OPTIONS") else method
payload = {
"m": method,
"m": wire_method,
"u": url,
# Let the browser/app see origin redirects and cookies directly.
"r": False,
@@ -2375,13 +2426,28 @@ class DomainFronter:
if not future.done():
future.set_result(result)
except Exception as e:
log.warning(
"Batch relay failed, disabling batch mode for %ds cooldown. "
"Error: %s: %s",
self._batch_cooldown, type(e).__name__, e or "(no details)",
)
self._batch_enabled = False
self._batch_disabled_at = time.time()
# Only globally disable batch mode for genuine failures (parse
# errors, protocol errors). A bare TimeoutError or transient
# connection drop is recoverable on the very next batch — keeping
# batch mode disabled for 60s while traffic floods (e.g. a Vercel
# marketing page with 200+ chunks) collapses every request into
# its own Apps Script execution and explodes quota usage.
transient = isinstance(e, (asyncio.TimeoutError, ConnectionError,
TimeoutError, OSError))
if transient:
log.warning(
"Batch relay transient error (%s: %s) — falling back "
"individually but keeping batch mode enabled",
type(e).__name__, e or "(no details)",
)
else:
log.warning(
"Batch relay failed, disabling batch mode for %ds cooldown. "
"Error: %s: %s",
self._batch_cooldown, type(e).__name__, e or "(no details)",
)
self._batch_enabled = False
self._batch_disabled_at = time.time()
# Fallback: send individually
tasks = []
for payload, future in batch:
@@ -2526,11 +2592,21 @@ class DomainFronter:
path = self._exec_path_for_sid(sid)
self._record_execution(sid)
t0 = time.perf_counter()
status, headers, body = await (self._pick_h2() or self._h2).request(
method="POST", path=path, host=self.http_host,
headers={"content-type": "application/json"},
body=json_body,
timeout=self._relay_timeout,
)
if log.isEnabledFor(logging.DEBUG):
log.debug(
"H2 relay %s [%s] %.0fms (%d bytes)",
payload.get("m", "?"),
(payload.get("u") or "")[:60],
(time.perf_counter() - t0) * 1000.0,
len(body),
)
return parse_relay_response(body, self._max_response_body_bytes)
@@ -2552,6 +2628,7 @@ class DomainFronter:
method="POST", path=path, host=self.http_host,
headers={"content-type": "application/json"},
body=json_body,
timeout=self._relay_timeout,
)
return parse_relay_response(body, self._max_response_body_bytes)
@@ -2654,18 +2731,33 @@ class DomainFronter:
)
path = self._exec_path_for_sid(sid)
# Try HTTP/2 first
# Try HTTP/2 first. Use the configured relay_timeout: batches can
# carry the combined response of N requests so they need at least as
# much time as a single relay. A hardcoded 30s timed out legitimate
# large-asset bursts and forced batch mode into a 60s cooldown,
# collapsing every subsequent request into its own Apps Script
# execution (huge quota burn).
if self._h2_available():
batch_timeout = max(self._relay_timeout, 30.0)
try:
self._record_execution(sid)
t0 = time.perf_counter()
status, headers, body = await asyncio.wait_for(
(self._pick_h2() or self._h2).request(
method="POST", path=path, host=self.http_host,
headers={"content-type": "application/json"},
body=json_body,
timeout=batch_timeout,
),
timeout=30,
timeout=batch_timeout,
)
if log.isEnabledFor(logging.DEBUG):
log.debug(
"H2 batch %d items: %.0fms (%d bytes)",
len(payloads),
(time.perf_counter() - t0) * 1000.0,
len(body),
)
self._record_h2_success()
return self._parse_batch_body(body, payloads)
except Exception as e:
+61 -24
View File
@@ -18,6 +18,7 @@ import asyncio
import logging
import socket
import ssl
import time
from urllib.parse import urlparse
try:
@@ -104,9 +105,18 @@ class H2Transport:
# Per-stream tracking
self._streams: dict[int, _StreamState] = {}
# Connection-level keepalive: ONE pinger task per connection, runs
# only while at least one stream is in flight. Replaces the previous
# per-stream ping task (N concurrent streams → N pingers each at
# ping_interval cadence → N×(1/interval) pings/sec, all serialised on
# _write_lock). With 20 in-flight streams and 0.2s interval that was
# ~100 PING/s contending with stream sends.
self._inflight_pinger_task: asyncio.Task | None = None
# Stats
self.total_requests = 0
self.total_streams = 0
self._ping_count = 0
# ── Connection lifecycle ──────────────────────────────────────
@@ -246,6 +256,11 @@ class H2Transport:
if read_task:
read_task.cancel()
await asyncio.gather(read_task, return_exceptions=True)
pinger_task = self._inflight_pinger_task
self._inflight_pinger_task = None
if pinger_task:
pinger_task.cancel()
await asyncio.gather(pinger_task, return_exceptions=True)
if self._writer:
try:
writer = self._writer
@@ -340,28 +355,17 @@ class H2Transport:
await self._flush()
# Wait for complete response, sending H2 PING frames concurrently.
# The PING/ACK bidirectional traffic masks the Apps Script execution
# silence that Iran DPI uses to identify and throttle relay patterns.
ping_task = None
if self._ping_interval > 0:
ping_task = asyncio.create_task(
self._ping_keepalive(state, self._ping_interval)
)
# Connection-level pinger: start if not running. ONE task pings the
# connection while ANY stream is in-flight. Replaces the previous
# per-stream pinger that produced O(N) pings/sec with N streams.
self._ensure_inflight_pinger()
try:
await asyncio.wait_for(state.done.wait(), timeout=timeout)
except asyncio.TimeoutError:
if ping_task:
ping_task.cancel()
await asyncio.gather(ping_task, return_exceptions=True)
self._streams.pop(stream_id, None)
raise TimeoutError(
f"H2 stream {stream_id} timed out ({timeout}s)"
)
finally:
if ping_task:
ping_task.cancel()
await asyncio.gather(ping_task, return_exceptions=True)
self._streams.pop(stream_id, None)
@@ -376,29 +380,62 @@ class H2Transport:
return state.status, state.headers, resp_body
async def _ping_keepalive(self, state: "_StreamState", interval: float):
"""Send H2 PING frames at *interval* seconds until *state* completes.
def _ensure_inflight_pinger(self) -> None:
"""Start the connection-level inflight pinger if not already running.
Iran DPI throttles relay responses when it detects a long TCP silence
ONE task pings the connection at ping_interval while at least one
stream is in flight. Auto-exits when no streams remain so it
doesn't ping during idle periods (keepalive_loop handles those).
"""
if self._ping_interval <= 0:
return
if self._inflight_pinger_task and not self._inflight_pinger_task.done():
return
self._inflight_pinger_task = asyncio.create_task(
self._inflight_pinger_loop()
)
async def _inflight_pinger_loop(self):
"""Background: ping while any stream is in-flight.
Iran DPI throttles relay responses when it detects long TCP silence
between the client's POST upload and the server's first response byte
(Apps Script typically takes 2-5 s to execute). Sending PING frames
(Apps Script typically takes 2-5s to execute). Sending PING frames
forces the server to emit PING ACK frames, creating continuous
bidirectional traffic that looks like a normal persistent connection
rather than a relay waiting for proxy execution.
One pinger per connection regardless of in-flight stream count.
"""
opaque = b"dpi-ping" # 8-byte PING opaque data
opaque = b"dpi-ping"
# Keep pinging for ~5s after the last stream finishes so Apps Script
# doesn't drop the connection during the brief gaps between bursts of
# requests (e.g. user navigating between pages). The full keepalive
# loop in domain_fronter pings only every 60s, which leaves a window
# where the server can send GOAWAY and the next request loses the race.
idle_grace_seconds = 5.0
idle_since: float | None = None
try:
while not state.done.is_set():
await asyncio.sleep(interval)
if state.done.is_set():
while True:
await asyncio.sleep(self._ping_interval)
if not self._connected or not self._h2:
break
if not self._streams:
now = time.monotonic()
if idle_since is None:
idle_since = now
elif (now - idle_since) >= idle_grace_seconds:
break
else:
idle_since = None
try:
async with self._write_lock:
if self._h2 and self._connected:
self._h2.ping(opaque)
await self._flush()
self._ping_count += 1
except Exception:
break # connection lost; let the main wait() handle it
break # connection lost; main wait() will surface error
except asyncio.CancelledError:
pass
+9
View File
@@ -268,6 +268,15 @@ def _decode_target_body(body: bytes, encoding: str) -> tuple[bytes, bool]:
if not body or not encodings:
return body, False
# Apps Script's UrlFetchApp transparently decompresses gzip/br/deflate
# responses but preserves the original Content-Encoding header in the
# forwarded metadata. The body therefore arrives as already-plain bytes
# while still being labelled (e.g.) "br". Detect that case up front so
# we don't pay the CPU cost of a guaranteed-failing brotli/zstd decode
# on every relayed response.
if _looks_like_plain_body(body):
return body, True
decoded = body
for layer in reversed(encodings):
before = decoded