feat: implement gzip compression for relay responses and enhance H2 transport with keepalive pings

This commit is contained in:
Abolfazl
2026-05-06 15:54:11 +03:30
parent c7e0d72635
commit a5d25f608f
5 changed files with 479 additions and 109 deletions
+27 -6
View File
@@ -55,6 +55,21 @@ function doPost(e) {
} }
} }
// Compress a byte array with gzip to reduce bytes-on-wire over DPI-shaped
// connections. Returns {b: byteArray, gz: true} if compression saves space,
// otherwise {b: original, gz: false}. This can cut text payloads by 60-80%.
function _maybeGzip(bytes) {
try {
var compressed = Utilities.gzip(Utilities.newBlob(bytes)).getBytes();
if (compressed.length < bytes.length) {
return { b: compressed, gz: true };
}
} catch (e) {
// Gzip failed — fall through to uncompressed
}
return { b: bytes, gz: false };
}
function _doSingle(req) { function _doSingle(req) {
if (!req.u || typeof req.u !== "string" || !req.u.match(/^https?:\/\//i)) { if (!req.u || typeof req.u !== "string" || !req.u.match(/^https?:\/\//i)) {
return _json({ e: "bad url" }); return _json({ e: "bad url" });
@@ -68,11 +83,14 @@ function _doSingle(req) {
} }
var opts = _buildOpts(req); var opts = _buildOpts(req);
var resp = UrlFetchApp.fetch(req.u, opts); var resp = UrlFetchApp.fetch(req.u, opts);
return _json({ var gz = _maybeGzip(resp.getContent());
var result = {
s: resp.getResponseCode(), s: resp.getResponseCode(),
h: _respHeaders(resp), h: _respHeaders(resp),
b: Utilities.base64Encode(resp.getContent()), b: Utilities.base64Encode(gz.b),
}); };
if (gz.gz) result.gz = 1;
return _json(result);
} }
function _doBatch(items) { function _doBatch(items) {
@@ -149,11 +167,14 @@ function _doBatch(items) {
if (!resp) { if (!resp) {
results.push({ e: "fetch failed" }); results.push({ e: "fetch failed" });
} else { } else {
results.push({ var gz = _maybeGzip(resp.getContent());
var item = {
s: resp.getResponseCode(), s: resp.getResponseCode(),
h: _respHeaders(resp), h: _respHeaders(resp),
b: Utilities.base64Encode(resp.getContent()), b: Utilities.base64Encode(gz.b),
}); };
if (gz.gz) item.gz = 1;
results.push(item);
} }
} }
} }
+2
View File
@@ -17,6 +17,8 @@
"tls_connect_timeout": 15, "tls_connect_timeout": 15,
"tcp_connect_timeout": 10, "tcp_connect_timeout": 10,
"parallel_relay": 1, "parallel_relay": 1,
"h2_connections": 2,
"enable_sub_batch": true,
"block_hosts": [ "block_hosts": [
"ads.example.com", "ads.example.com",
".doubleclick.net" ".doubleclick.net"
+361 -98
View File
@@ -90,8 +90,16 @@ def _mask_sid(sid: str) -> str:
class DomainFronter: class DomainFronter:
_STATIC_EXTS = STATIC_EXTS _STATIC_EXTS = STATIC_EXTS
_H2_FAILURE_COOLDOWN = 60.0 _H2_FAILURE_COOLDOWN = 15.0 # reduced: DPI token bucket refills in ~8-10s
_H2_FAILURE_THRESHOLD = 3 _H2_FAILURE_THRESHOLD = 5 # raised: needs genuine consecutive failures
# URL extensions that almost always produce large responses (fonts, images,
# media). These are isolated into their own H2 sub-batch so a 400 kB font
# doesn't block a 2 kB JS file waiting for the same Apps Script response.
_HEAVY_EXTENSIONS = frozenset({
"woff2", "woff", "ttf", "eot", "otf",
"jpg", "jpeg", "png", "gif", "webp", "avif", "ico",
"mp4", "mp3", "wav", "webm", "ogg", "flac",
})
_DOWNLOAD_STREAM_COOLDOWN = 300.0 _DOWNLOAD_STREAM_COOLDOWN = 300.0
_COALESCE_VARY_HEADERS = ( _COALESCE_VARY_HEADERS = (
"accept", "accept",
@@ -181,12 +189,21 @@ class DomainFronter:
self._batch_lock = asyncio.Lock() self._batch_lock = asyncio.Lock()
self._batch_pending: list[tuple[dict, asyncio.Future]] = [] self._batch_pending: list[tuple[dict, asyncio.Future]] = []
self._batch_task: asyncio.Task | None = None self._batch_task: asyncio.Task | None = None
self._batch_window_micro = BATCH_WINDOW_MICRO self._batch_window_micro = float(config.get("batch_window_micro", BATCH_WINDOW_MICRO))
self._batch_window_macro = BATCH_WINDOW_MACRO self._batch_window_macro = float(config.get("batch_window_macro", BATCH_WINDOW_MACRO))
self._batch_max = BATCH_MAX self._batch_max = int(config.get("batch_max", BATCH_MAX))
self._batch_enabled = True # enable_batch=false → each request gets its own H2 stream → N×2 KiB/s
# aggregate throughput instead of all requests sharing one stream.
# Recommended when DPI does per-stream rate limiting (e.g. Iran).
self._batch_permanent_disable: bool = not bool(config.get("enable_batch", True))
self._batch_enabled = not self._batch_permanent_disable
self._batch_disabled_at = 0.0 self._batch_disabled_at = 0.0
self._batch_cooldown = 60 self._batch_cooldown = 60
# enable_sub_batch=false → all batches are sent as a single Apps Script
# call regardless of how many H2 connections are live. Saves quota at
# the cost of parallel DPI bypass (each connection no longer gets its
# own token bucket). Useful when quota is the binding constraint.
self._sub_batch_enabled: bool = bool(config.get("enable_sub_batch", True))
# Request coalescing — dedup concurrent identical GETs # Request coalescing — dedup concurrent identical GETs
self._coalesce: dict[str, list[asyncio.Future]] = {} self._coalesce: dict[str, list[asyncio.Future]] = {}
@@ -194,17 +211,39 @@ class DomainFronter:
self._h2_disabled_until = 0.0 self._h2_disabled_until = 0.0
self._stream_download_disabled_until: dict[str, float] = {} self._stream_download_disabled_until: dict[str, float] = {}
# HTTP/2 multiplexing — one connection handles all requests # HTTP/2 multiplexing — pool of parallel connections for DPI bypass.
# Iran's DPI shapes per-TCP-connection; N separate connections each
# get their own independent token bucket, giving ~N× throughput.
self._h2 = None self._h2 = None
self._h2_pool: list = []
self._h2_pool_idx: int = 0
try: try:
from .h2_transport import H2Transport, H2_AVAILABLE from .h2_transport import H2Transport, H2_AVAILABLE
if H2_AVAILABLE: if H2_AVAILABLE:
self._h2 = H2Transport( try:
self.connect_host, self.sni_host, self.verify_ssl, n_conns = max(1, int(config.get("h2_connections", 3)))
sni_hosts=self._sni_hosts, except (TypeError, ValueError):
n_conns = 3
no_sni = bool(config.get("no_sni", False))
try:
ping_interval = float(config.get("ping_interval", 0.2))
except (TypeError, ValueError):
ping_interval = 0.2
self._h2_pool = [
H2Transport(
self.connect_host, self.sni_host, self.verify_ssl,
sni_hosts=self._sni_hosts,
no_sni=no_sni,
ping_interval=ping_interval,
)
for _ in range(n_conns)
]
self._h2 = self._h2_pool[0] # primary; used for ping/reconnect
log.info(
"HTTP/2 multiplexing available — %d parallel connections "
"(each gets its own DPI token bucket)",
n_conns,
) )
log.info("HTTP/2 multiplexing available — "
"all requests will share one connection")
except ImportError: except ImportError:
pass pass
@@ -218,12 +257,19 @@ class DomainFronter:
"Execution monitor enabled: reporting total every %.0fs", "Execution monitor enabled: reporting total every %.0fs",
self._execution_report_interval, self._execution_report_interval,
) )
log.info( if self._batch_permanent_disable:
"Batch config: micro=%.0fms macro=%.0fms max=%d", log.info(
self._batch_window_micro * 1000.0, "Batch DISABLED (enable_batch=false) — each request fires its own "
self._batch_window_macro * 1000.0, "H2 stream for N×2 KiB/s aggregate throughput"
self._batch_max, )
) else:
log.info(
"Batch config: micro=%.0fms macro=%.0fms max=%d sub_batch=%s",
self._batch_window_micro * 1000.0,
self._batch_window_macro * 1000.0,
self._batch_max,
"on" if self._sub_batch_enabled else "off",
)
# Exit node — optional second-hop relay with a non-Google exit IP. # Exit node — optional second-hop relay with a non-Google exit IP.
# Useful for sites that block GCP/Apps Script IPs (e.g. ChatGPT). # Useful for sites that block GCP/Apps Script IPs (e.g. ChatGPT).
@@ -318,11 +364,30 @@ class DomainFronter:
return self._ssl_context return self._ssl_context
def _h2_available(self) -> bool: def _h2_available(self) -> bool:
return ( if not self._h2_pool or time.time() < self._h2_disabled_until:
self._h2 is not None return False
and self._h2.is_connected return any(t.is_connected for t in self._h2_pool)
and time.time() >= self._h2_disabled_until
) def _pick_h2(self):
"""Round-robin pick a connected H2Transport from the pool.
Distributes relay requests across multiple TCP connections so each
benefits from its own independent DPI throughput budget.
Returns the primary transport when none are connected (caller will
trigger reconnection via the normal failure/cooldown path).
"""
pool = self._h2_pool
n = len(pool)
if not n:
return self._h2
for i in range(n):
t = pool[(self._h2_pool_idx + i) % n]
if t.is_connected:
self._h2_pool_idx = (self._h2_pool_idx + i + 1) % n
return t
# None connected — advance index and return primary
self._h2_pool_idx = (self._h2_pool_idx + 1) % n
return pool[0]
def _record_h2_success(self) -> None: def _record_h2_success(self) -> None:
self._h2_failure_streak = 0 self._h2_failure_streak = 0
@@ -346,6 +411,27 @@ class DomainFronter:
type(exc).__name__, type(exc).__name__,
) )
@staticmethod
def _is_h2_transport_error(exc: BaseException) -> bool:
"""Return True only for genuine H2 *transport* failures.
Apps Script request timeouts (TimeoutError) and application-level
errors are NOT H2 transport failures — the connection may be fine.
Counting them pushes the failure streak toward the disable threshold
even when H2 is healthy, which causes unnecessary 15s fallbacks.
Only connection-level errors should disable H2.
"""
if isinstance(exc, asyncio.TimeoutError):
return False
if isinstance(exc, (ConnectionError, OSError, ssl.SSLError)):
return True
msg = str(exc).lower()
return any(k in msg for k in (
"connection closed", "connection lost", "stream error",
"alpn negotiation", "transport closed", "h2 reader",
"eof", "broken pipe",
))
def _stream_download_allowed(self, url: str) -> bool: def _stream_download_allowed(self, url: str) -> bool:
host = self._host_key(url) host = self._host_key(url)
if not host: if not host:
@@ -444,9 +530,9 @@ class DomainFronter:
self._sni_hosts = reordered self._sni_hosts = reordered
self._sni_idx = 0 self._sni_idx = 0
if self._h2 is not None: for _t in self._h2_pool:
self._h2._sni_hosts = list(reordered) _t._sni_hosts = list(reordered)
self._h2._sni_idx = 0 _t._sni_idx = 0
log.info( log.info(
"SNI pool re-ranked by local probe: %s", "SNI pool re-ranked by local probe: %s",
", ".join(f"{sni} ({ms:.0f}ms)" for ms, sni in ranked), ", ".join(f"{sni} ({ms:.0f}ms)" for ms, sni in ranked),
@@ -956,23 +1042,38 @@ class DomainFronter:
await self._flush_pool() await self._flush_pool()
if self._h2: for _t in self._h2_pool:
try: try:
await self._h2.close() await _t.close()
except Exception as exc: except Exception as exc:
log.debug("h2 close: %s", exc) log.debug("h2 pool close: %s", exc)
async def _h2_connect(self): async def _h2_connect(self):
"""Connect the HTTP/2 transport in background.""" """Connect all HTTP/2 transports in the pool."""
if self._h2 is None: if not self._h2_pool:
return return
if time.time() < self._h2_disabled_until: if time.time() < self._h2_disabled_until:
return return
try: try:
await self._ensure_sni_ranked() await self._ensure_sni_ranked()
await self._h2.ensure_connected() results = await asyncio.gather(
self._record_h2_success() *[t.ensure_connected() for t in self._h2_pool],
log.info("H2 multiplexing active — one conn handles all requests") return_exceptions=True,
)
connected = sum(1 for r in results if not isinstance(r, Exception))
if connected > 0:
self._record_h2_success()
log.info(
"H2 multiplexing active — %d/%d connections live",
connected, len(self._h2_pool),
)
else:
exc = next(r for r in results if isinstance(r, Exception))
self._record_h2_failure(exc)
log.warning(
"H2 connect failed (%s: %s), using H1 pool fallback",
type(exc).__name__, exc or "(no details)",
)
except Exception as e: except Exception as e:
self._record_h2_failure(e) self._record_h2_failure(e)
log.warning( log.warning(
@@ -1042,30 +1143,43 @@ class DomainFronter:
"""Send periodic pings to keep Apps Script warm + H2 connection alive.""" """Send periodic pings to keep Apps Script warm + H2 connection alive."""
while True: while True:
try: try:
# Keep a conservative cadence to avoid any chance of this loop # 60s cadence: Iran DPI/NAT can drop idle connections in ~30-60s.
# contending with foreground relay work. # Pinging every 60s keeps all pool members alive without burning
await asyncio.sleep(240) # significant Apps Script quota.
await asyncio.sleep(60)
# If H2 is absent or still in cooldown, skip this tick. # If H2 is absent or still in cooldown, skip this tick.
if self._h2 is None or time.time() < self._h2_disabled_until: if self._h2 is None or time.time() < self._h2_disabled_until:
continue continue
# Reconnect in background when needed, but bound it with a # Reconnect any disconnected pool members.
# timeout so recovery attempts can never stall the loop. for _t in list(self._h2_pool):
if not self._h2.is_connected: if not _t.is_connected:
try: try:
await asyncio.wait_for( await asyncio.wait_for(
self._h2.reconnect(), _t.reconnect(),
timeout=max(self._tls_connect_timeout, 8.0), timeout=max(self._tls_connect_timeout, 8.0),
) )
self._record_h2_success() self._record_h2_success()
log.info("H2 re-established after failure") log.info("H2 connection re-established")
except Exception as exc: except Exception as exc:
self._record_h2_failure(exc) # Keepalive reconnect failures are background recovery
continue # attempts — do NOT count them toward the disable
# threshold or healthy traffic gets penalised.
log.debug("H2 background reconnect failed: %s", exc)
# H2 PING to keep connection alive if not any(t.is_connected for t in self._h2_pool):
await self._h2.ping() continue # all transports down — skip ping
# H2 PING frame to every connected pool member.
# This tells each OS/DPI that the TCP connection is still in use,
# preventing the 30-60s idle-reset that Iran DPI applies.
for _t in self._h2_pool:
if _t.is_connected:
try:
await _t.ping()
except Exception:
pass
# Apps Script keepalive — warm the container # Apps Script keepalive — warm the container
payload = {"m": "GET", "u": "http://example.com/", "k": self.auth_key} payload = {"m": "GET", "u": "http://example.com/", "k": self.auth_key}
@@ -1128,6 +1242,24 @@ class DomainFronter:
# stop waiting on the first request. # stop waiting on the first request.
self._pool_ready.set() self._pool_ready.set()
async def _reconnect_pool_members(self) -> None:
"""Background: reconnect any H2 pool members that dropped.
Called after a transport error in the relay path so connections are
recovered promptly instead of waiting for the next keepalive tick.
Does NOT increment the failure streak — this is a recovery action.
"""
for _t in self._h2_pool:
if not _t.is_connected:
try:
await asyncio.wait_for(
_t.reconnect(),
timeout=max(self._tls_connect_timeout, 8.0),
)
log.debug("H2 pool member recovered")
except Exception as exc:
log.debug("H2 pool member reconnect failed: %s", exc)
def _auth_header(self) -> str: def _auth_header(self) -> str:
return f"X-Auth-Key: {self.auth_key}\r\n" if self.auth_key else "" return f"X-Auth-Key: {self.auth_key}\r\n" if self.auth_key else ""
@@ -1922,7 +2054,8 @@ class DomainFronter:
# If batching is disabled, retry enabling it after a cooldown. # If batching is disabled, retry enabling it after a cooldown.
if not self._batch_enabled: if not self._batch_enabled:
if ( if (
self._batch_disabled_at > 0 not self._batch_permanent_disable
and self._batch_disabled_at > 0
and (time.time() - self._batch_disabled_at) >= self._batch_cooldown and (time.time() - self._batch_disabled_at) >= self._batch_cooldown
): ):
self._batch_enabled = True self._batch_enabled = True
@@ -1980,8 +2113,78 @@ class DomainFronter:
self._batch_task = None self._batch_task = None
self._spawn(self._batch_send(batch)) self._spawn(self._batch_send(batch))
@staticmethod
def _split_list(lst: list, n: int) -> list[list]:
"""Split lst into n roughly-equal contiguous chunks (no empty chunks)."""
n = min(n, len(lst))
k, rem = divmod(len(lst), n)
chunks, start = [], 0
for i in range(n):
size = k + (1 if i < rem else 0)
chunks.append(lst[start:start + size])
start += size
return chunks
@staticmethod
def _url_ext(url: str) -> str:
"""Extract the lowercase file extension from a URL path (no query)."""
try:
path = urlparse(url).path
if "." in path:
return path.rsplit(".", 1)[-1].lower()
except Exception:
pass
return ""
def _make_sub_batches(self, batch: list, n_connections: int) -> list[list]:
"""Build sub-batches that isolate heavy (binary) from light requests.
A 2 kB CSS file and a 400 kB font batched together mean the CSS
future doesn't resolve until the font finishes downloading at 40 KB/s
(~15s). By separating heavy files onto their own H2 connection the
light files resolve in <1s and the browser can continue rendering
while the large binaries transfer in parallel.
"""
if n_connections <= 1:
return [batch]
heavy, light = [], []
for item in batch:
url = item[0].get("u", "")
ext = self._url_ext(url)
(heavy if ext in self._HEAVY_EXTENSIONS else light).append(item)
if not heavy:
# All light items (CSS, JS, JSON…) — keep as a single batch.
# Each sub-batch is one Apps Script execution; splitting N small
# files into N executions wastes N× quota with negligible DPI
# benefit (small payloads clear the token bucket quickly anyway).
return [batch]
if not light:
# All heavy items — split across connections so each large file
# gets its own DPI token bucket (parallel throughput).
return self._split_list(batch, min(n_connections, len(batch)))
# Reserve n_connections-1 slots for heavy items (each gets its own
# throughput budget); give the remaining slot(s) to light items.
n_heavy_slots = min(n_connections - 1, len(heavy))
sub_batches = self._split_list(heavy, n_heavy_slots)
n_light_slots = n_connections - len(sub_batches)
if n_light_slots > 1 and len(light) >= n_light_slots:
sub_batches += self._split_list(light, n_light_slots)
else:
sub_batches.append(light)
return [s for s in sub_batches if s]
async def _batch_send(self, batch: list): async def _batch_send(self, batch: list):
"""Send a batch of requests. Uses fetchAll for multi, single for one.""" """Send a batch of requests, split across H2 connections for parallel throughput.
Iran's DPI shapes per-TCP-connection. A 600 kB response over one
connection at 40 KB/s takes ~15s. Splitting the same batch across 3
connections means each carries ~200 kB → ~5s, all in parallel → 3×
faster wall-clock. Each sub-batch is an independent Apps Script
fetchAll call on a separate H2 transport.
"""
if len(batch) == 1: if len(batch) == 1:
payload, future = batch[0] payload, future = batch[0]
try: try:
@@ -1991,26 +2194,86 @@ class DomainFronter:
except Exception as e: except Exception as e:
if not future.done(): if not future.done():
future.set_result(error_response(502, str(e))) future.set_result(error_response(502, str(e)))
else: return
log.info("Batch relay: %d requests", len(batch))
try: # Determine how many live H2 connections to split across.
results = await self._relay_batch([p for p, _ in batch]) n_live = (
for (_, future), result in zip(batch, results): sum(1 for t in self._h2_pool if t.is_connected)
if not future.done(): if self._h2_pool else 0
future.set_result(result) )
except Exception as e: n_splits = min(n_live, len(batch)) if (n_live > 1 and self._sub_batch_enabled) else 1
log.warning(
"Batch relay failed, disabling batch mode for %ds cooldown. " if n_splits > 1:
"Error: %s: %s", # Build size-aware sub-batches: heavy files (fonts, images) get
self._batch_cooldown, type(e).__name__, e or "(no details)", # their own H2 connection so light files don't wait for them.
) chunks = self._make_sub_batches(batch, n_splits)
self._batch_enabled = False
self._batch_disabled_at = time.time() heavy_count = sum(
# Fallback: send individually 1 for p, _ in batch
tasks = [] if self._url_ext(p.get("u", "")) in self._HEAVY_EXTENSIONS
for payload, future in batch: )
tasks.append(self._relay_fallback(payload, future)) log.info(
await asyncio.gather(*tasks) "Batch relay: %d requests (%d heavy+%d light) → %d sub-batches (%s)",
len(batch), heavy_count, len(batch) - heavy_count,
len(chunks), "+".join(str(len(c)) for c in chunks),
)
# Wrap each sub-batch relay with timing so slow connections are
# logged and we can correlate them with DPI shaping events.
async def _timed_sub_batch(items: list):
t0 = time.perf_counter()
result = await self._relay_batch([p for p, _ in items])
return result, time.perf_counter() - t0
chunk_results = await asyncio.gather(
*[_timed_sub_batch(c) for c in chunks],
return_exceptions=True,
)
max_dt = 0.0
for chunk, result in zip(chunks, chunk_results):
if isinstance(result, Exception):
log.warning(
"Sub-batch failed (%s: %s), retrying individually",
type(result).__name__, result,
)
for payload, future in chunk:
self._spawn(self._relay_fallback(payload, future))
else:
items_result, dt = result
max_dt = max(max_dt, dt)
if dt > 8.0:
log.warning(
"Slow sub-batch: %.1fs for %d items — DPI shaping?",
dt, len(chunk),
)
for (_, future), raw in zip(chunk, items_result):
if not future.done():
future.set_result(raw)
if max_dt > 0:
log.debug("Batch wall-clock: %.1fs", max_dt)
return
# Single-batch path: H2 unavailable or only one connection live.
log.info("Batch relay: %d requests", len(batch))
try:
results = await self._relay_batch([p for p, _ in batch])
for (_, future), result in zip(batch, results):
if not future.done():
future.set_result(result)
except Exception as e:
log.warning(
"Batch relay failed, disabling batch mode for %ds cooldown. "
"Error: %s: %s",
self._batch_cooldown, type(e).__name__, e or "(no details)",
)
self._batch_enabled = False
self._batch_disabled_at = time.time()
# Fallback: send individually
tasks = []
for payload, future in batch:
tasks.append(self._relay_fallback(payload, future))
await asyncio.gather(*tasks)
async def _relay_fallback(self, payload, future): async def _relay_fallback(self, payload, future):
"""Fallback: relay a single request from a failed batch.""" """Fallback: relay a single request from a failed batch."""
@@ -2041,7 +2304,9 @@ class DomainFronter:
self._record_h2_success() self._record_h2_success()
return result return result
except Exception as e: except Exception as e:
self._record_h2_failure(e) if self._is_h2_transport_error(e):
self._record_h2_failure(e)
self._spawn(self._reconnect_pool_members())
log.debug("Fan-out relay failed (%s), falling back", e) log.debug("Fan-out relay failed (%s), falling back", e)
# fall through to single-path logic below # fall through to single-path logic below
@@ -2055,24 +2320,20 @@ class DomainFronter:
self._record_h2_success() self._record_h2_success()
return result return result
except Exception as e: except Exception as e:
self._record_h2_failure(e) is_transport = self._is_h2_transport_error(e)
if attempt < attempts - 1: if is_transport:
log.debug("H2 relay failed (%s), reconnecting", e) self._record_h2_failure(e)
try: # Spawn background reconnect for any newly-dead transports
await self._h2.reconnect() # so future requests find healthy connections.
# Do NOT record success here — only a successful relay self._spawn(self._reconnect_pool_members())
# response proves the connection works. Recording if attempt < attempts - 1 and self._h2_available():
# success after reconnect was resetting the failure log.debug("H2 relay attempt %d failed (%s: %s), retrying",
# streak and causing an infinite reconnect storm. attempt + 1, type(e).__name__, e)
except Exception as reconnect_exc:
self._record_h2_failure(reconnect_exc)
log.warning("H2 reconnect failed, falling back to H1")
break
else: else:
# Last H2 attempt failed — fall through to H1 rather log.debug(
# than raising here, which would bypass H1 entirely. "H2 relay failed (%s: %s), falling back to H1",
log.debug("H2 relay failed on final attempt (%s), " type(e).__name__, e,
"falling back to H1", e) )
break break
# HTTP/1.1 fallback (pool-based) # HTTP/1.1 fallback (pool-based)
@@ -2141,8 +2402,8 @@ class DomainFronter:
async def _relay_single_h2(self, payload: dict) -> bytes: async def _relay_single_h2(self, payload: dict) -> bytes:
"""Execute a relay through HTTP/2 multiplexing. """Execute a relay through HTTP/2 multiplexing.
Uses the shared H2 connection — no pool checkout needed. Picks a connection from the pool via round-robin so each request
Many concurrent calls all share one TLS connection. benefits from its own DPI token bucket.
""" """
full_payload = dict(payload) full_payload = dict(payload)
full_payload["k"] = self.auth_key full_payload["k"] = self.auth_key
@@ -2152,7 +2413,7 @@ class DomainFronter:
path = self._exec_path_for_sid(sid) path = self._exec_path_for_sid(sid)
self._record_execution(sid) self._record_execution(sid)
status, headers, body = await self._h2.request( status, headers, body = await (self._pick_h2() or self._h2).request(
method="POST", path=path, host=self.http_host, method="POST", path=path, host=self.http_host,
headers={"content-type": "application/json"}, headers={"content-type": "application/json"},
body=json_body, body=json_body,
@@ -2174,7 +2435,7 @@ class DomainFronter:
path = self._exec_path_for_sid(sid) path = self._exec_path_for_sid(sid)
self._record_execution(sid) self._record_execution(sid)
status, headers, body = await self._h2.request( status, headers, body = await (self._pick_h2() or self._h2).request(
method="POST", path=path, host=self.http_host, method="POST", path=path, host=self.http_host,
headers={"content-type": "application/json"}, headers={"content-type": "application/json"},
body=json_body, body=json_body,
@@ -2285,7 +2546,7 @@ class DomainFronter:
try: try:
self._record_execution(sid) self._record_execution(sid)
status, headers, body = await asyncio.wait_for( status, headers, body = await asyncio.wait_for(
self._h2.request( (self._pick_h2() or self._h2).request(
method="POST", path=path, host=self.http_host, method="POST", path=path, host=self.http_host,
headers={"content-type": "application/json"}, headers={"content-type": "application/json"},
body=json_body, body=json_body,
@@ -2295,7 +2556,9 @@ class DomainFronter:
self._record_h2_success() self._record_h2_success()
return self._parse_batch_body(body, payloads) return self._parse_batch_body(body, payloads)
except Exception as e: except Exception as e:
self._record_h2_failure(e) if self._is_h2_transport_error(e):
self._record_h2_failure(e)
self._spawn(self._reconnect_pool_members())
log.debug("H2 batch failed (%s), falling back to H1", e) log.debug("H2 batch failed (%s), falling back to H1", e)
# HTTP/1.1 fallback # HTTP/1.1 fallback
+62 -2
View File
@@ -68,10 +68,23 @@ class H2Transport:
def __init__(self, connect_host: str, sni_host: str, def __init__(self, connect_host: str, sni_host: str,
verify_ssl: bool = True, verify_ssl: bool = True,
sni_hosts: list[str] | None = None): sni_hosts: list[str] | None = None,
no_sni: bool = False,
ping_interval: float = 0.2):
self.connect_host = connect_host self.connect_host = connect_host
self.sni_host = sni_host self.sni_host = sni_host
self.verify_ssl = verify_ssl self.verify_ssl = verify_ssl
# no_sni=True: omit the SNI extension from TLS ClientHello entirely.
# DPI cannot match a hostname → may bypass per-SNI throttle rules.
# Google's GFE accepts SNI-less connections (returns default cert).
# Requires verify_ssl=False since hostname won't be in default cert CN.
self._no_sni: bool = no_sni
# ping_interval: seconds between H2 PING frames sent while waiting
# for a relay response. Fills the TCP silence that Iran DPI uses to
# classify and throttle Apps Script relay connections. Server MUST
# reply with PING ACK → continuous bidirectional flow visible to DPI.
# Set to 0 to disable. Default 0.2s (5 pings/s during GAS execution).
self._ping_interval: float = ping_interval
# Optional SNI rotation pool — picked round-robin on each new connect. # Optional SNI rotation pool — picked round-robin on each new connect.
# Falls back to the single sni_host if no pool is given. # Falls back to the single sni_host if no pool is given.
self._sni_hosts: list[str] = [h for h in (sni_hosts or []) if h] or [sni_host] self._sni_hosts: list[str] = [h for h in (sni_hosts or []) if h] or [sni_host]
@@ -132,6 +145,13 @@ class H2Transport:
self._sni_idx += 1 self._sni_idx += 1
self.sni_host = sni # kept for backward-compat logging self.sni_host = sni # kept for backward-compat logging
# no_sni mode: omit SNI extension entirely so DPI can't classify by
# hostname. Forces cert-verification off (default cert won't match).
if self._no_sni:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
sni = None # server_hostname=None → no SNI extension sent
# Create raw TCP socket with TCP_NODELAY BEFORE TLS handshake. # Create raw TCP socket with TCP_NODELAY BEFORE TLS handshake.
# Nagle's algorithm can delay small writes (H2 frames) by up to 200ms # Nagle's algorithm can delay small writes (H2 frames) by up to 200ms
# waiting to coalesce — TCP_NODELAY forces immediate send. # waiting to coalesce — TCP_NODELAY forces immediate send.
@@ -320,14 +340,28 @@ class H2Transport:
await self._flush() await self._flush()
# Wait for complete response # Wait for complete response, sending H2 PING frames concurrently.
# The PING/ACK bidirectional traffic masks the Apps Script execution
# silence that Iran DPI uses to identify and throttle relay patterns.
ping_task = None
if self._ping_interval > 0:
ping_task = asyncio.create_task(
self._ping_keepalive(state, self._ping_interval)
)
try: try:
await asyncio.wait_for(state.done.wait(), timeout=timeout) await asyncio.wait_for(state.done.wait(), timeout=timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
if ping_task:
ping_task.cancel()
await asyncio.gather(ping_task, return_exceptions=True)
self._streams.pop(stream_id, None) self._streams.pop(stream_id, None)
raise TimeoutError( raise TimeoutError(
f"H2 stream {stream_id} timed out ({timeout}s)" f"H2 stream {stream_id} timed out ({timeout}s)"
) )
finally:
if ping_task:
ping_task.cancel()
await asyncio.gather(ping_task, return_exceptions=True)
self._streams.pop(stream_id, None) self._streams.pop(stream_id, None)
@@ -342,6 +376,32 @@ class H2Transport:
return state.status, state.headers, resp_body return state.status, state.headers, resp_body
async def _ping_keepalive(self, state: "_StreamState", interval: float):
"""Send H2 PING frames at *interval* seconds until *state* completes.
Iran DPI throttles relay responses when it detects a long TCP silence
between the client's POST upload and the server's first response byte
(Apps Script typically takes 2-5 s to execute). Sending PING frames
forces the server to emit PING ACK frames, creating continuous
bidirectional traffic that looks like a normal persistent connection
rather than a relay waiting for proxy execution.
"""
opaque = b"dpi-ping" # 8-byte PING opaque data
try:
while not state.done.is_set():
await asyncio.sleep(interval)
if state.done.is_set():
break
try:
async with self._write_lock:
if self._h2 and self._connected:
self._h2.ping(opaque)
await self._flush()
except Exception:
break # connection lost; let the main wait() handle it
except asyncio.CancelledError:
pass
def _send_body(self, stream_id: int, body: bytes): def _send_body(self, stream_id: int, body: bytes):
"""Send request body, respecting H2 flow control window. """Send request body, respecting H2 flow control window.
+27 -3
View File
@@ -21,7 +21,6 @@ classify_relay_error(raw) -> str
""" """
import base64 import base64
import codecs
import gzip import gzip
import json import json
import logging import logging
@@ -233,6 +232,15 @@ def parse_relay_json(data: dict, max_body_bytes: int) -> bytes:
resp_headers = data.get("h", {}) resp_headers = data.get("h", {})
resp_body = base64.b64decode(data.get("b", "")) resp_body = base64.b64decode(data.get("b", ""))
# Decompress relay-level gzip applied by Code.gs to shrink bytes-on-wire
# over DPI-shaped connections. The "gz" flag is set when Code.gs found
# that gzip saved space (all text content: JS, CSS, HTML, JSON).
if data.get("gz"):
try:
resp_body = gzip.decompress(resp_body)
except Exception as _exc:
log.debug("relay gz decompress failed: %s", _exc)
# ── Decompress if the target sent a compressed body ───────────────────────── # ── Decompress if the target sent a compressed body ─────────────────────────
# UrlFetchApp does NOT auto-decompress gzip/deflate responses, so if the # UrlFetchApp does NOT auto-decompress gzip/deflate responses, so if the
# client's Accept-Encoding header was forwarded and the server compressed # client's Accept-Encoding header was forwarded and the server compressed
@@ -304,7 +312,13 @@ def parse_relay_json(data: dict, max_body_bytes: int) -> bytes:
def extract_apps_script_user_html(text: str) -> str | None: def extract_apps_script_user_html(text: str) -> str | None:
"""Extract embedded user HTML from an Apps Script HTML-page response.""" """Extract embedded user HTML from an Apps Script HTML-page response.
Google's IFRAME_SANDBOX mode returns /exec responses wrapped in an HTML
page that includes a goog.script.init("...") call. The first argument is
a JS string literal (\\xNN hex escapes) containing a JSON payload with
a ``userHtml`` field that holds the actual relay response.
"""
marker = 'goog.script.init("' marker = 'goog.script.init("'
start = text.find(marker) start = text.find(marker)
if start == -1: if start == -1:
@@ -316,7 +330,17 @@ def extract_apps_script_user_html(text: str) -> str | None:
encoded = text[start:end] encoded = text[start:end]
try: try:
decoded = codecs.decode(encoded, "unicode_escape") # The JS string uses \xNN hex escapes and \/ for forward-slash.
# Also unescape \\ → \ (JS double-backslash = literal backslash).
# Order: hex first, then double-backslash, then \/ so that
# \\/ (JS for literal-backslash + /) works correctly.
decoded = re.sub(
r'\\x([0-9a-fA-F]{2})',
lambda m: chr(int(m.group(1), 16)),
encoded,
)
decoded = decoded.replace("\\\\", "\\")
decoded = decoded.replace("\\/", "/")
payload = json.loads(decoded) payload = json.loads(decoded)
except Exception: except Exception:
return None return None