feat: (Needs redeploy Code.gs) enhance SNI pool management with latency probing and JSON handling improvements

This commit is contained in:
Abolfazl
2026-05-01 05:56:56 +03:30
parent 749a2bed81
commit 4ae0d115c5
2 changed files with 187 additions and 20 deletions
+6 -3
View File
@@ -1,6 +1,6 @@
/** /**
* MasterHttpRelay — Google Apps Script * MasterHttpRelay — Google Apps Script
* *
* DEPLOYMENT: * DEPLOYMENT:
* 1. Go to https://script.google.com → New project * 1. Go to https://script.google.com → New project
* 2. Delete the default code, paste THIS entire file * 2. Delete the default code, paste THIS entire file
@@ -181,7 +181,10 @@ function doGet(e) {
} }
function _json(obj) { function _json(obj) {
return ContentService.createTextOutput(JSON.stringify(obj)).setMimeType( // HtmlService responses can stay on script.google.com for /dev, while
ContentService.MimeType.JSON // ContentService commonly bounces through script.googleusercontent.com.
// The Python client extracts the JSON payload from the body either way.
return HtmlService.createHtmlOutput(JSON.stringify(obj)).setXFrameOptionsMode(
HtmlService.XFrameOptionsMode.ALLOWALL
); );
} }
+181 -17
View File
@@ -10,12 +10,14 @@ returns the response.
import asyncio import asyncio
import base64 import base64
import codecs
import hashlib import hashlib
import json import json
import logging import logging
import re import re
import socket import socket
import ssl import ssl
import statistics
import tempfile import tempfile
import time import time
from dataclasses import dataclass from dataclasses import dataclass
@@ -82,8 +84,12 @@ def _build_sni_pool(front_domain: str, overrides: list | None) -> list[str]:
return out return out
fd = (front_domain or "").lower().rstrip(".") fd = (front_domain or "").lower().rstrip(".")
if fd.endswith(".google.com") or fd == "google.com": if fd.endswith(".google.com") or fd == "google.com":
# Ensure the configured front_domain is first (stable default). # For Google fronting we prefer the curated pool order, which can be
pool = [fd] + [h for h in FRONT_SNI_POOL_GOOGLE if h != fd] # latency-biased for common censored networks. Include the configured
# front_domain if it is custom, but do not pin it first.
pool = list(FRONT_SNI_POOL_GOOGLE)
if fd and fd not in pool:
pool.insert(0, fd)
return pool return pool
return [fd] if fd else ["www.google.com"] return [fd] if fd else ["www.google.com"]
@@ -112,6 +118,7 @@ class DomainFronter:
self.sni_host, config.get("front_domains"), self.sni_host, config.get("front_domains"),
) )
self._sni_idx = 0 self._sni_idx = 0
self._sni_probe_task: asyncio.Task | None = None
self.http_host = "script.google.com" self.http_host = "script.google.com"
# Multi-script round-robin for higher throughput # Multi-script round-robin for higher throughput
script = config.get("script_ids") or config.get("script_id") script = config.get("script_ids") or config.get("script_id")
@@ -145,6 +152,7 @@ class DomainFronter:
self._tls_connect_timeout = self._cfg_float( self._tls_connect_timeout = self._cfg_float(
config, "tls_connect_timeout", TLS_CONNECT_TIMEOUT, minimum=1.0, config, "tls_connect_timeout", TLS_CONNECT_TIMEOUT, minimum=1.0,
) )
self._sni_probe_timeout = min(self._tls_connect_timeout, 4.0)
self._max_response_body_bytes = self._cfg_int( self._max_response_body_bytes = self._cfg_int(
config, "max_response_body_bytes", MAX_RESPONSE_BODY_BYTES, config, "max_response_body_bytes", MAX_RESPONSE_BODY_BYTES,
minimum=1024, minimum=1024,
@@ -317,6 +325,125 @@ class DomainFronter:
self._sni_idx += 1 self._sni_idx += 1
return sni return sni
async def _ensure_sni_ranked(self) -> None:
if len(self._sni_hosts) <= 1:
return
task = self._sni_probe_task
if task is None:
task = self._spawn(self._rank_sni_hosts())
self._sni_probe_task = task
try:
await task
except Exception as exc:
log.debug("SNI probe failed: %s", exc)
async def _rank_sni_hosts(self) -> None:
sid = self._script_ids[0]
original = list(self._sni_hosts)
ranked: list[tuple[float, str]] = []
failed: list[str] = []
for sni in original:
result = await self._probe_sni_latency(sni, sid)
if result is None:
failed.append(sni)
else:
ranked.append((result, sni))
if not ranked:
return
ranked.sort(key=lambda item: item[0])
reordered = [sni for _, sni in ranked] + failed
if reordered == original:
log.info(
"SNI probe kept order: %s",
", ".join(f"{sni} ({ms:.0f}ms)" for ms, sni in ranked),
)
return
self._sni_hosts = reordered
self._sni_idx = 0
if self._h2 is not None:
self._h2._sni_hosts = list(reordered)
self._h2._sni_idx = 0
log.info(
"SNI pool re-ranked by local probe: %s",
", ".join(f"{sni} ({ms:.0f}ms)" for ms, sni in ranked),
)
if failed:
log.info("SNI probe timed out: %s", ", ".join(failed))
async def _probe_sni_latency(self, sni: str, sid: str) -> float | None:
samples: list[float] = []
for _ in range(2):
sample = await self._probe_sni_latency_once(sni, sid)
if sample is not None:
samples.append(sample)
if not samples:
return None
return statistics.median(samples)
async def _probe_sni_latency_once(self, sni: str, sid: str) -> float | None:
payload = json.dumps(
{"m": "GET", "u": "http://example.com/", "k": self.auth_key}
).encode()
request = (
f"POST /macros/s/{sid}/exec HTTP/1.1\r\n"
f"Host: {self.http_host}\r\n"
"Content-Type: application/json\r\n"
f"Content-Length: {len(payload)}\r\n"
"Connection: close\r\n\r\n"
).encode() + payload
loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setblocking(False)
started = time.perf_counter()
reader = None
writer = None
try:
await asyncio.wait_for(
loop.sock_connect(sock, (self.connect_host, 443)),
timeout=self._sni_probe_timeout,
)
reader, writer = await asyncio.wait_for(
asyncio.open_connection(
sock=sock,
ssl=self._ssl_ctx(),
server_hostname=sni,
),
timeout=self._sni_probe_timeout,
)
writer.write(request)
await asyncio.wait_for(writer.drain(), timeout=self._sni_probe_timeout)
head = b""
while b"\r\n\r\n" not in head and len(head) < 8192:
chunk = await asyncio.wait_for(
reader.read(512), timeout=self._sni_probe_timeout,
)
if not chunk:
break
head += chunk
if not head.startswith(b"HTTP/"):
return None
return (time.perf_counter() - started) * 1000
except Exception:
return None
finally:
if writer is not None:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
elif sock.fileno() != -1:
try:
sock.close()
except Exception:
pass
async def _acquire(self): async def _acquire(self):
"""Get a healthy TLS connection from pool (TTL-checked) or open new.""" """Get a healthy TLS connection from pool (TTL-checked) or open new."""
now = asyncio.get_running_loop().time() now = asyncio.get_running_loop().time()
@@ -770,6 +897,8 @@ class DomainFronter:
if self._warmed: if self._warmed:
return return
self._warmed = True self._warmed = True
if self._sni_probe_task is None and len(self._sni_hosts) > 1:
self._sni_probe_task = self._spawn(self._rank_sni_hosts())
self._warm_task = self._spawn(self._do_warm()) self._warm_task = self._spawn(self._do_warm())
# Start continuous pool maintenance # Start continuous pool maintenance
if self._maintenance_task is None: if self._maintenance_task is None:
@@ -821,6 +950,7 @@ class DomainFronter:
if time.time() < self._h2_disabled_until: if time.time() < self._h2_disabled_until:
return return
try: try:
await self._ensure_sni_ranked()
await self._h2.ensure_connected() await self._h2.ensure_connected()
self._record_h2_success() self._record_h2_success()
log.info("H2 multiplexing active — one conn handles all requests") log.info("H2 multiplexing active — one conn handles all requests")
@@ -839,7 +969,7 @@ class DomainFronter:
async def _prewarm_script(self): async def _prewarm_script(self):
"""Pre-warm Apps Script and detect /dev fast path (no redirect).""" """Pre-warm Apps Script and detect /dev fast path (no redirect)."""
payload = json.dumps( payload = json.dumps(
{"m": "HEAD", "u": "http://example.com/", "k": self.auth_key} {"m": "GET", "u": "http://example.com/", "k": self.auth_key}
).encode() ).encode()
hdrs = {"content-type": "application/json"} hdrs = {"content-type": "application/json"}
sid = self._script_ids[0] sid = self._script_ids[0]
@@ -857,7 +987,7 @@ class DomainFronter:
timeout=15, timeout=15,
) )
dt = (time.perf_counter() - t0) * 1000 dt = (time.perf_counter() - t0) * 1000
data = json.loads(body.decode(errors="replace")) data = self._load_relay_json(body.decode(errors="replace"))
if "s" in data: if "s" in data:
self._dev_available = True self._dev_available = True
log.info("/dev fast path active (%.0fms, no redirect)", dt) log.info("/dev fast path active (%.0fms, no redirect)", dt)
@@ -899,7 +1029,7 @@ class DomainFronter:
await self._h2.ping() await self._h2.ping()
# Apps Script keepalive — warm the container # Apps Script keepalive — warm the container
payload = {"m": "HEAD", "u": "http://example.com/", "k": self.auth_key} payload = {"m": "GET", "u": "http://example.com/", "k": self.auth_key}
path = self._exec_path("example.com") path = self._exec_path("example.com")
t0 = time.perf_counter() t0 = time.perf_counter()
await asyncio.wait_for( await asyncio.wait_for(
@@ -931,7 +1061,7 @@ class DomainFronter:
if self._h2_available(): if self._h2_available():
continue # H2 keepalive is already pinging, skip continue # H2 keepalive is already pinging, skip
payload = self._build_payload( payload = self._build_payload(
"HEAD", "http://example.com/", {}, b"" "GET", "http://example.com/", {}, b""
) )
t0 = time.perf_counter() t0 = time.perf_counter()
# _relay_payload_h1 has its own per-attempt timeout internally; # _relay_payload_h1 has its own per-attempt timeout internally;
@@ -947,6 +1077,7 @@ class DomainFronter:
async def _do_warm(self): async def _do_warm(self):
"""Open WARM_POOL_COUNT connections in parallel — failures are fine.""" """Open WARM_POOL_COUNT connections in parallel — failures are fine."""
await self._ensure_sni_ranked()
count = WARM_POOL_COUNT count = WARM_POOL_COUNT
coros = [self._add_conn_to_pool() for _ in range(count)] coros = [self._add_conn_to_pool() for _ in range(count)]
results = await asyncio.gather(*coros, return_exceptions=True) results = await asyncio.gather(*coros, return_exceptions=True)
@@ -2122,20 +2253,53 @@ class DomainFronter:
if not text: if not text:
return self._error_response(502, "Empty response from relay") return self._error_response(502, "Empty response from relay")
try: data = self._load_relay_json(text)
data = json.loads(text) if data is None:
except json.JSONDecodeError: return self._error_response(502, f"No JSON: {text[:200]}")
m = re.search(r'\{.*\}', text, re.DOTALL)
if m:
try:
data = json.loads(m.group())
except json.JSONDecodeError:
return self._error_response(502, f"Bad JSON: {text[:200]}")
else:
return self._error_response(502, f"No JSON: {text[:200]}")
return self._parse_relay_json(data) return self._parse_relay_json(data)
@staticmethod
def _load_relay_json(text: str) -> dict | None:
try:
return json.loads(text)
except json.JSONDecodeError:
wrapped = DomainFronter._extract_apps_script_user_html(text)
if wrapped:
data = DomainFronter._load_relay_json(wrapped)
if data is not None:
return data
match = re.search(r'\{.*\}', text, re.DOTALL)
if not match:
return None
try:
data = json.loads(match.group())
except json.JSONDecodeError:
return None
return data if isinstance(data, dict) else None
@staticmethod
def _extract_apps_script_user_html(text: str) -> str | None:
marker = 'goog.script.init("'
start = text.find(marker)
if start == -1:
return None
start += len(marker)
end = text.find('", "", undefined', start)
if end == -1:
return None
encoded = text[start:end]
try:
decoded = codecs.decode(encoded, "unicode_escape")
payload = json.loads(decoded)
except Exception:
return None
user_html = payload.get("userHtml")
return user_html if isinstance(user_html, str) else None
def _parse_relay_json(self, data: dict) -> bytes: def _parse_relay_json(self, data: dict) -> bytes:
"""Convert a parsed relay JSON dict to raw HTTP response bytes.""" """Convert a parsed relay JSON dict to raw HTTP response bytes."""
if "e" in data: if "e" in data: