feat: enhance batch processing and warmup logic in proxy server and domain fronter

This commit is contained in:
Abolfazl
2026-05-03 06:02:13 +03:30
parent 64d900cdfe
commit cc906ebbf6
3 changed files with 62 additions and 16 deletions
+42 -12
View File
@@ -201,6 +201,12 @@ class DomainFronter:
"Execution monitor enabled: reporting total every %.0fs",
self._execution_report_interval,
)
log.info(
"Batch config: micro=%.0fms macro=%.0fms max=%d",
self._batch_window_micro * 1000.0,
self._batch_window_macro * 1000.0,
self._batch_max,
)
# Exit node — optional second-hop relay with a non-Google exit IP.
# Useful for sites that block GCP/Apps Script IPs (e.g. ChatGPT).
@@ -881,6 +887,23 @@ class DomainFronter:
# active its _keepalive_loop skips the ping; they do not double-fire.
self._spawn(self._h1_container_keepalive())
async def wait_until_warm(self, timeout: float | None = None) -> bool:
"""Start warmup and wait until the initial pool-open phase finishes.
Returns True if warmup finished before timeout, else False.
"""
await self._warm_pool()
if self._pool_ready.is_set():
return True
try:
if timeout is None or timeout <= 0:
await self._pool_ready.wait()
else:
await asyncio.wait_for(self._pool_ready.wait(), timeout=timeout)
return True
except asyncio.TimeoutError:
return False
def _spawn(self, coro) -> asyncio.Task:
"""Create a task and keep a strong reference for clean cancellation."""
task = asyncio.create_task(coro)
@@ -1801,20 +1824,32 @@ class DomainFronter:
if method not in {"GET", "HEAD"} or body:
return True
# Static assets are safe to batch in parallel as independent requests.
is_static = cls._is_static_asset_url(url)
if headers:
for name in STATEFUL_HEADER_NAMES:
# Hard stateful markers: preserve strict ordering / isolation.
for name in ("cookie", "authorization", "proxy-authorization"):
if cls._header_value(headers, name):
return True
accept = cls._header_value(headers, "accept").lower()
if "text/html" in accept or "application/json" in accept:
if "text/html" in accept:
return True
fetch_mode = cls._header_value(headers, "sec-fetch-mode").lower()
if fetch_mode in {"navigate", "cors"}:
if fetch_mode == "navigate":
return True
return not cls._is_static_asset_url(url)
fetch_dest = cls._header_value(headers, "sec-fetch-dest").lower()
if fetch_dest in {"document", "iframe", "frame"}:
return True
# Non-static JSON/API calls are treated as stateful by default.
if (not is_static) and "application/json" in accept:
return True
return not is_static
# ── Batch collector ───────────────────────────────────────────
@@ -2241,14 +2276,9 @@ class DomainFronter:
payloads: list[dict]) -> list[bytes]:
"""Parse a batch response body into individual results."""
text = resp_body.decode(errors="replace").strip()
try:
data = json.loads(text)
except json.JSONDecodeError:
m = re.search(r'\{.*\}', text, re.DOTALL)
try:
data = json.loads(m.group()) if m else None
except json.JSONDecodeError:
data = None
# Apps Script can wrap JSON inside an HTML shell; reuse the same
# robust loader used by single-response parsing.
data = load_relay_json(text)
if not data:
raise RuntimeError(f"Bad batch response: {text[:200]}")