feat: improve task cancellation and error logging in DomainFronter

This commit is contained in:
Abolfazl
2026-05-03 06:49:15 +03:30
parent 9f8c28a309
commit 3079b83181
2 changed files with 36 additions and 11 deletions
+6
View File
@@ -310,6 +310,12 @@ async def _run(config):
await server.start() await server.start()
finally: finally:
await server.stop() await server.stop()
# Cancel any tasks that leaked through (e.g. fire-and-forget pool tasks).
stray = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for t in stray:
t.cancel()
if stray:
await asyncio.gather(*stray, return_exceptions=True)
if __name__ == "__main__": if __name__ == "__main__":
+30 -11
View File
@@ -511,7 +511,7 @@ class DomainFronter:
reader, writer, created = self._pool.pop() reader, writer, created = self._pool.pop()
if (now - created) < self._conn_ttl and not reader.at_eof(): if (now - created) < self._conn_ttl and not reader.at_eof():
# Eagerly replace the connection we just took # Eagerly replace the connection we just took
asyncio.create_task(self._add_conn_to_pool()) self._spawn(self._add_conn_to_pool())
return reader, writer, created return reader, writer, created
try: try:
writer.close() writer.close()
@@ -954,15 +954,21 @@ class DomainFronter:
log.info("H2 multiplexing active — one conn handles all requests") log.info("H2 multiplexing active — one conn handles all requests")
except Exception as e: except Exception as e:
self._record_h2_failure(e) self._record_h2_failure(e)
log.warning("H2 connect failed (%s), using H1 pool fallback", e) log.warning(
"H2 connect failed (%s: %s), using H1 pool fallback",
type(e).__name__,
e or "(no details)",
)
async def _h2_connect_and_warm(self): async def _h2_connect_and_warm(self):
"""Connect H2, pre-warm the Apps Script container, start keepalive.""" """Connect H2, pre-warm the Apps Script container, start keepalive."""
await self._h2_connect() await self._h2_connect()
if self._h2_available(): if self._h2_available():
self._spawn(self._prewarm_script()) self._spawn(self._prewarm_script())
if self._keepalive_task is None or self._keepalive_task.done(): # Always start keepalive — even on startup failure it will retry H2
self._keepalive_task = self._spawn(self._keepalive_loop()) # once the cooldown expires instead of leaving H1-only permanently.
if self._keepalive_task is None or self._keepalive_task.done():
self._keepalive_task = self._spawn(self._keepalive_loop())
async def _prewarm_script(self): async def _prewarm_script(self):
"""Pre-warm Apps Script and detect /dev fast path (no redirect).""" """Pre-warm Apps Script and detect /dev fast path (no redirect)."""
@@ -1015,12 +1021,24 @@ class DomainFronter:
"""Send periodic pings to keep Apps Script warm + H2 connection alive.""" """Send periodic pings to keep Apps Script warm + H2 connection alive."""
while True: while True:
try: try:
await asyncio.sleep(240) # 4 minutes — saves ~90 quota hits/day vs 180s # Keep a conservative cadence to avoid any chance of this loop
# Google's container timeout is ~5 min idle # contending with foreground relay work.
if not self._h2_available(): await asyncio.sleep(240)
# If H2 is absent or still in cooldown, skip this tick.
if self._h2 is None or time.time() < self._h2_disabled_until:
continue
# Reconnect in background when needed, but bound it with a
# timeout so recovery attempts can never stall the loop.
if not self._h2.is_connected:
try: try:
await self._h2.reconnect() await asyncio.wait_for(
self._h2.reconnect(),
timeout=max(self._tls_connect_timeout, 8.0),
)
self._record_h2_success() self._record_h2_success()
log.info("H2 re-established after failure")
except Exception as exc: except Exception as exc:
self._record_h2_failure(exc) self._record_h2_failure(exc)
continue continue
@@ -1813,7 +1831,8 @@ class DomainFronter:
@classmethod @classmethod
def _is_static_asset_url(cls, url: str) -> bool: def _is_static_asset_url(cls, url: str) -> bool:
path = urlparse(url).path.lower() path = urlparse(url).path.lower()
return any(path.endswith(ext) for ext in cls._STATIC_EXTS) # Also match versioned paths like /script.js/v3a4b… or /font.woff2/hash
return any(path.endswith(ext) or f"{ext}/" in path for ext in cls._STATIC_EXTS)
@staticmethod @staticmethod
def _header_value(headers: dict | None, name: str) -> str: def _header_value(headers: dict | None, name: str) -> str:
@@ -1947,8 +1966,8 @@ class DomainFronter:
except Exception as e: except Exception as e:
log.warning( log.warning(
"Batch relay failed, disabling batch mode for %ds cooldown. " "Batch relay failed, disabling batch mode for %ds cooldown. "
"Error: %s", "Error: %s: %s",
self._batch_cooldown, e, self._batch_cooldown, type(e).__name__, e or "(no details)",
) )
self._batch_enabled = False self._batch_enabled = False
self._batch_disabled_at = time.time() self._batch_disabled_at = time.time()