From a5d25f608f46646a1e47baff912f10a929034c4a Mon Sep 17 00:00:00 2001 From: Abolfazl Date: Wed, 6 May 2026 15:54:11 +0330 Subject: [PATCH] feat: implement gzip compression for relay responses and enhance H2 transport with keepalive pings --- apps_script/Code.gs | 33 ++- config.example.json | 2 + src/relay/domain_fronter.py | 459 ++++++++++++++++++++++++++++-------- src/relay/h2_transport.py | 64 ++++- src/relay/relay_response.py | 30 ++- 5 files changed, 479 insertions(+), 109 deletions(-) diff --git a/apps_script/Code.gs b/apps_script/Code.gs index 411295a..970e98f 100644 --- a/apps_script/Code.gs +++ b/apps_script/Code.gs @@ -55,6 +55,21 @@ function doPost(e) { } } +// Compress a byte array with gzip to reduce bytes-on-wire over DPI-shaped +// connections. Returns {b: byteArray, gz: true} if compression saves space, +// otherwise {b: original, gz: false}. This can cut text payloads by 60-80%. +function _maybeGzip(bytes) { + try { + var compressed = Utilities.gzip(Utilities.newBlob(bytes)).getBytes(); + if (compressed.length < bytes.length) { + return { b: compressed, gz: true }; + } + } catch (e) { + // Gzip failed — fall through to uncompressed + } + return { b: bytes, gz: false }; +} + function _doSingle(req) { if (!req.u || typeof req.u !== "string" || !req.u.match(/^https?:\/\//i)) { return _json({ e: "bad url" }); @@ -68,11 +83,14 @@ function _doSingle(req) { } var opts = _buildOpts(req); var resp = UrlFetchApp.fetch(req.u, opts); - return _json({ + var gz = _maybeGzip(resp.getContent()); + var result = { s: resp.getResponseCode(), h: _respHeaders(resp), - b: Utilities.base64Encode(resp.getContent()), - }); + b: Utilities.base64Encode(gz.b), + }; + if (gz.gz) result.gz = 1; + return _json(result); } function _doBatch(items) { @@ -149,11 +167,14 @@ function _doBatch(items) { if (!resp) { results.push({ e: "fetch failed" }); } else { - results.push({ + var gz = _maybeGzip(resp.getContent()); + var item = { s: resp.getResponseCode(), h: _respHeaders(resp), - b: Utilities.base64Encode(resp.getContent()), - }); + b: Utilities.base64Encode(gz.b), + }; + if (gz.gz) item.gz = 1; + results.push(item); } } } diff --git a/config.example.json b/config.example.json index 6e47f32..e186bf8 100644 --- a/config.example.json +++ b/config.example.json @@ -17,6 +17,8 @@ "tls_connect_timeout": 15, "tcp_connect_timeout": 10, "parallel_relay": 1, + "h2_connections": 2, + "enable_sub_batch": true, "block_hosts": [ "ads.example.com", ".doubleclick.net" diff --git a/src/relay/domain_fronter.py b/src/relay/domain_fronter.py index 7621a84..bc85f73 100644 --- a/src/relay/domain_fronter.py +++ b/src/relay/domain_fronter.py @@ -90,8 +90,16 @@ def _mask_sid(sid: str) -> str: class DomainFronter: _STATIC_EXTS = STATIC_EXTS - _H2_FAILURE_COOLDOWN = 60.0 - _H2_FAILURE_THRESHOLD = 3 + _H2_FAILURE_COOLDOWN = 15.0 # reduced: DPI token bucket refills in ~8-10s + _H2_FAILURE_THRESHOLD = 5 # raised: needs genuine consecutive failures + # URL extensions that almost always produce large responses (fonts, images, + # media). These are isolated into their own H2 sub-batch so a 400 kB font + # doesn't block a 2 kB JS file waiting for the same Apps Script response. + _HEAVY_EXTENSIONS = frozenset({ + "woff2", "woff", "ttf", "eot", "otf", + "jpg", "jpeg", "png", "gif", "webp", "avif", "ico", + "mp4", "mp3", "wav", "webm", "ogg", "flac", + }) _DOWNLOAD_STREAM_COOLDOWN = 300.0 _COALESCE_VARY_HEADERS = ( "accept", @@ -181,12 +189,21 @@ class DomainFronter: self._batch_lock = asyncio.Lock() self._batch_pending: list[tuple[dict, asyncio.Future]] = [] self._batch_task: asyncio.Task | None = None - self._batch_window_micro = BATCH_WINDOW_MICRO - self._batch_window_macro = BATCH_WINDOW_MACRO - self._batch_max = BATCH_MAX - self._batch_enabled = True + self._batch_window_micro = float(config.get("batch_window_micro", BATCH_WINDOW_MICRO)) + self._batch_window_macro = float(config.get("batch_window_macro", BATCH_WINDOW_MACRO)) + self._batch_max = int(config.get("batch_max", BATCH_MAX)) + # enable_batch=false → each request gets its own H2 stream → N×2 KiB/s + # aggregate throughput instead of all requests sharing one stream. + # Recommended when DPI does per-stream rate limiting (e.g. Iran). + self._batch_permanent_disable: bool = not bool(config.get("enable_batch", True)) + self._batch_enabled = not self._batch_permanent_disable self._batch_disabled_at = 0.0 self._batch_cooldown = 60 + # enable_sub_batch=false → all batches are sent as a single Apps Script + # call regardless of how many H2 connections are live. Saves quota at + # the cost of parallel DPI bypass (each connection no longer gets its + # own token bucket). Useful when quota is the binding constraint. + self._sub_batch_enabled: bool = bool(config.get("enable_sub_batch", True)) # Request coalescing — dedup concurrent identical GETs self._coalesce: dict[str, list[asyncio.Future]] = {} @@ -194,17 +211,39 @@ class DomainFronter: self._h2_disabled_until = 0.0 self._stream_download_disabled_until: dict[str, float] = {} - # HTTP/2 multiplexing — one connection handles all requests + # HTTP/2 multiplexing — pool of parallel connections for DPI bypass. + # Iran's DPI shapes per-TCP-connection; N separate connections each + # get their own independent token bucket, giving ~N× throughput. self._h2 = None + self._h2_pool: list = [] + self._h2_pool_idx: int = 0 try: from .h2_transport import H2Transport, H2_AVAILABLE if H2_AVAILABLE: - self._h2 = H2Transport( - self.connect_host, self.sni_host, self.verify_ssl, - sni_hosts=self._sni_hosts, + try: + n_conns = max(1, int(config.get("h2_connections", 3))) + except (TypeError, ValueError): + n_conns = 3 + no_sni = bool(config.get("no_sni", False)) + try: + ping_interval = float(config.get("ping_interval", 0.2)) + except (TypeError, ValueError): + ping_interval = 0.2 + self._h2_pool = [ + H2Transport( + self.connect_host, self.sni_host, self.verify_ssl, + sni_hosts=self._sni_hosts, + no_sni=no_sni, + ping_interval=ping_interval, + ) + for _ in range(n_conns) + ] + self._h2 = self._h2_pool[0] # primary; used for ping/reconnect + log.info( + "HTTP/2 multiplexing available — %d parallel connections " + "(each gets its own DPI token bucket)", + n_conns, ) - log.info("HTTP/2 multiplexing available — " - "all requests will share one connection") except ImportError: pass @@ -218,12 +257,19 @@ class DomainFronter: "Execution monitor enabled: reporting total every %.0fs", self._execution_report_interval, ) - log.info( - "Batch config: micro=%.0fms macro=%.0fms max=%d", - self._batch_window_micro * 1000.0, - self._batch_window_macro * 1000.0, - self._batch_max, - ) + if self._batch_permanent_disable: + log.info( + "Batch DISABLED (enable_batch=false) — each request fires its own " + "H2 stream for N×2 KiB/s aggregate throughput" + ) + else: + log.info( + "Batch config: micro=%.0fms macro=%.0fms max=%d sub_batch=%s", + self._batch_window_micro * 1000.0, + self._batch_window_macro * 1000.0, + self._batch_max, + "on" if self._sub_batch_enabled else "off", + ) # Exit node — optional second-hop relay with a non-Google exit IP. # Useful for sites that block GCP/Apps Script IPs (e.g. ChatGPT). @@ -318,11 +364,30 @@ class DomainFronter: return self._ssl_context def _h2_available(self) -> bool: - return ( - self._h2 is not None - and self._h2.is_connected - and time.time() >= self._h2_disabled_until - ) + if not self._h2_pool or time.time() < self._h2_disabled_until: + return False + return any(t.is_connected for t in self._h2_pool) + + def _pick_h2(self): + """Round-robin pick a connected H2Transport from the pool. + + Distributes relay requests across multiple TCP connections so each + benefits from its own independent DPI throughput budget. + Returns the primary transport when none are connected (caller will + trigger reconnection via the normal failure/cooldown path). + """ + pool = self._h2_pool + n = len(pool) + if not n: + return self._h2 + for i in range(n): + t = pool[(self._h2_pool_idx + i) % n] + if t.is_connected: + self._h2_pool_idx = (self._h2_pool_idx + i + 1) % n + return t + # None connected — advance index and return primary + self._h2_pool_idx = (self._h2_pool_idx + 1) % n + return pool[0] def _record_h2_success(self) -> None: self._h2_failure_streak = 0 @@ -346,6 +411,27 @@ class DomainFronter: type(exc).__name__, ) + @staticmethod + def _is_h2_transport_error(exc: BaseException) -> bool: + """Return True only for genuine H2 *transport* failures. + + Apps Script request timeouts (TimeoutError) and application-level + errors are NOT H2 transport failures — the connection may be fine. + Counting them pushes the failure streak toward the disable threshold + even when H2 is healthy, which causes unnecessary 15s fallbacks. + Only connection-level errors should disable H2. + """ + if isinstance(exc, asyncio.TimeoutError): + return False + if isinstance(exc, (ConnectionError, OSError, ssl.SSLError)): + return True + msg = str(exc).lower() + return any(k in msg for k in ( + "connection closed", "connection lost", "stream error", + "alpn negotiation", "transport closed", "h2 reader", + "eof", "broken pipe", + )) + def _stream_download_allowed(self, url: str) -> bool: host = self._host_key(url) if not host: @@ -444,9 +530,9 @@ class DomainFronter: self._sni_hosts = reordered self._sni_idx = 0 - if self._h2 is not None: - self._h2._sni_hosts = list(reordered) - self._h2._sni_idx = 0 + for _t in self._h2_pool: + _t._sni_hosts = list(reordered) + _t._sni_idx = 0 log.info( "SNI pool re-ranked by local probe: %s", ", ".join(f"{sni} ({ms:.0f}ms)" for ms, sni in ranked), @@ -956,23 +1042,38 @@ class DomainFronter: await self._flush_pool() - if self._h2: + for _t in self._h2_pool: try: - await self._h2.close() + await _t.close() except Exception as exc: - log.debug("h2 close: %s", exc) + log.debug("h2 pool close: %s", exc) async def _h2_connect(self): - """Connect the HTTP/2 transport in background.""" - if self._h2 is None: + """Connect all HTTP/2 transports in the pool.""" + if not self._h2_pool: return if time.time() < self._h2_disabled_until: return try: await self._ensure_sni_ranked() - await self._h2.ensure_connected() - self._record_h2_success() - log.info("H2 multiplexing active — one conn handles all requests") + results = await asyncio.gather( + *[t.ensure_connected() for t in self._h2_pool], + return_exceptions=True, + ) + connected = sum(1 for r in results if not isinstance(r, Exception)) + if connected > 0: + self._record_h2_success() + log.info( + "H2 multiplexing active — %d/%d connections live", + connected, len(self._h2_pool), + ) + else: + exc = next(r for r in results if isinstance(r, Exception)) + self._record_h2_failure(exc) + log.warning( + "H2 connect failed (%s: %s), using H1 pool fallback", + type(exc).__name__, exc or "(no details)", + ) except Exception as e: self._record_h2_failure(e) log.warning( @@ -1042,30 +1143,43 @@ class DomainFronter: """Send periodic pings to keep Apps Script warm + H2 connection alive.""" while True: try: - # Keep a conservative cadence to avoid any chance of this loop - # contending with foreground relay work. - await asyncio.sleep(240) + # 60s cadence: Iran DPI/NAT can drop idle connections in ~30-60s. + # Pinging every 60s keeps all pool members alive without burning + # significant Apps Script quota. + await asyncio.sleep(60) # If H2 is absent or still in cooldown, skip this tick. if self._h2 is None or time.time() < self._h2_disabled_until: continue - # Reconnect in background when needed, but bound it with a - # timeout so recovery attempts can never stall the loop. - if not self._h2.is_connected: - try: - await asyncio.wait_for( - self._h2.reconnect(), - timeout=max(self._tls_connect_timeout, 8.0), - ) - self._record_h2_success() - log.info("H2 re-established after failure") - except Exception as exc: - self._record_h2_failure(exc) - continue + # Reconnect any disconnected pool members. + for _t in list(self._h2_pool): + if not _t.is_connected: + try: + await asyncio.wait_for( + _t.reconnect(), + timeout=max(self._tls_connect_timeout, 8.0), + ) + self._record_h2_success() + log.info("H2 connection re-established") + except Exception as exc: + # Keepalive reconnect failures are background recovery + # attempts — do NOT count them toward the disable + # threshold or healthy traffic gets penalised. + log.debug("H2 background reconnect failed: %s", exc) - # H2 PING to keep connection alive - await self._h2.ping() + if not any(t.is_connected for t in self._h2_pool): + continue # all transports down — skip ping + + # H2 PING frame to every connected pool member. + # This tells each OS/DPI that the TCP connection is still in use, + # preventing the 30-60s idle-reset that Iran DPI applies. + for _t in self._h2_pool: + if _t.is_connected: + try: + await _t.ping() + except Exception: + pass # Apps Script keepalive — warm the container payload = {"m": "GET", "u": "http://example.com/", "k": self.auth_key} @@ -1128,6 +1242,24 @@ class DomainFronter: # stop waiting on the first request. self._pool_ready.set() + async def _reconnect_pool_members(self) -> None: + """Background: reconnect any H2 pool members that dropped. + + Called after a transport error in the relay path so connections are + recovered promptly instead of waiting for the next keepalive tick. + Does NOT increment the failure streak — this is a recovery action. + """ + for _t in self._h2_pool: + if not _t.is_connected: + try: + await asyncio.wait_for( + _t.reconnect(), + timeout=max(self._tls_connect_timeout, 8.0), + ) + log.debug("H2 pool member recovered") + except Exception as exc: + log.debug("H2 pool member reconnect failed: %s", exc) + def _auth_header(self) -> str: return f"X-Auth-Key: {self.auth_key}\r\n" if self.auth_key else "" @@ -1922,7 +2054,8 @@ class DomainFronter: # If batching is disabled, retry enabling it after a cooldown. if not self._batch_enabled: if ( - self._batch_disabled_at > 0 + not self._batch_permanent_disable + and self._batch_disabled_at > 0 and (time.time() - self._batch_disabled_at) >= self._batch_cooldown ): self._batch_enabled = True @@ -1980,8 +2113,78 @@ class DomainFronter: self._batch_task = None self._spawn(self._batch_send(batch)) + @staticmethod + def _split_list(lst: list, n: int) -> list[list]: + """Split lst into n roughly-equal contiguous chunks (no empty chunks).""" + n = min(n, len(lst)) + k, rem = divmod(len(lst), n) + chunks, start = [], 0 + for i in range(n): + size = k + (1 if i < rem else 0) + chunks.append(lst[start:start + size]) + start += size + return chunks + + @staticmethod + def _url_ext(url: str) -> str: + """Extract the lowercase file extension from a URL path (no query).""" + try: + path = urlparse(url).path + if "." in path: + return path.rsplit(".", 1)[-1].lower() + except Exception: + pass + return "" + + def _make_sub_batches(self, batch: list, n_connections: int) -> list[list]: + """Build sub-batches that isolate heavy (binary) from light requests. + + A 2 kB CSS file and a 400 kB font batched together mean the CSS + future doesn't resolve until the font finishes downloading at 40 KB/s + (~15s). By separating heavy files onto their own H2 connection the + light files resolve in <1s and the browser can continue rendering + while the large binaries transfer in parallel. + """ + if n_connections <= 1: + return [batch] + + heavy, light = [], [] + for item in batch: + url = item[0].get("u", "") + ext = self._url_ext(url) + (heavy if ext in self._HEAVY_EXTENSIONS else light).append(item) + + if not heavy: + # All light items (CSS, JS, JSON…) — keep as a single batch. + # Each sub-batch is one Apps Script execution; splitting N small + # files into N executions wastes N× quota with negligible DPI + # benefit (small payloads clear the token bucket quickly anyway). + return [batch] + if not light: + # All heavy items — split across connections so each large file + # gets its own DPI token bucket (parallel throughput). + return self._split_list(batch, min(n_connections, len(batch))) + + # Reserve n_connections-1 slots for heavy items (each gets its own + # throughput budget); give the remaining slot(s) to light items. + n_heavy_slots = min(n_connections - 1, len(heavy)) + sub_batches = self._split_list(heavy, n_heavy_slots) + n_light_slots = n_connections - len(sub_batches) + if n_light_slots > 1 and len(light) >= n_light_slots: + sub_batches += self._split_list(light, n_light_slots) + else: + sub_batches.append(light) + return [s for s in sub_batches if s] + async def _batch_send(self, batch: list): - """Send a batch of requests. Uses fetchAll for multi, single for one.""" + """Send a batch of requests, split across H2 connections for parallel throughput. + + Iran's DPI shapes per-TCP-connection. A 600 kB response over one + connection at 40 KB/s takes ~15s. Splitting the same batch across 3 + connections means each carries ~200 kB → ~5s, all in parallel → 3× + faster wall-clock. Each sub-batch is an independent Apps Script + fetchAll call on a separate H2 transport. + """ if len(batch) == 1: payload, future = batch[0] try: @@ -1991,26 +2194,86 @@ class DomainFronter: except Exception as e: if not future.done(): future.set_result(error_response(502, str(e))) - else: - log.info("Batch relay: %d requests", len(batch)) - try: - results = await self._relay_batch([p for p, _ in batch]) - for (_, future), result in zip(batch, results): - if not future.done(): - future.set_result(result) - except Exception as e: - log.warning( - "Batch relay failed, disabling batch mode for %ds cooldown. " - "Error: %s: %s", - self._batch_cooldown, type(e).__name__, e or "(no details)", - ) - self._batch_enabled = False - self._batch_disabled_at = time.time() - # Fallback: send individually - tasks = [] - for payload, future in batch: - tasks.append(self._relay_fallback(payload, future)) - await asyncio.gather(*tasks) + return + + # Determine how many live H2 connections to split across. + n_live = ( + sum(1 for t in self._h2_pool if t.is_connected) + if self._h2_pool else 0 + ) + n_splits = min(n_live, len(batch)) if (n_live > 1 and self._sub_batch_enabled) else 1 + + if n_splits > 1: + # Build size-aware sub-batches: heavy files (fonts, images) get + # their own H2 connection so light files don't wait for them. + chunks = self._make_sub_batches(batch, n_splits) + + heavy_count = sum( + 1 for p, _ in batch + if self._url_ext(p.get("u", "")) in self._HEAVY_EXTENSIONS + ) + log.info( + "Batch relay: %d requests (%d heavy+%d light) → %d sub-batches (%s)", + len(batch), heavy_count, len(batch) - heavy_count, + len(chunks), "+".join(str(len(c)) for c in chunks), + ) + + # Wrap each sub-batch relay with timing so slow connections are + # logged and we can correlate them with DPI shaping events. + async def _timed_sub_batch(items: list): + t0 = time.perf_counter() + result = await self._relay_batch([p for p, _ in items]) + return result, time.perf_counter() - t0 + + chunk_results = await asyncio.gather( + *[_timed_sub_batch(c) for c in chunks], + return_exceptions=True, + ) + + max_dt = 0.0 + for chunk, result in zip(chunks, chunk_results): + if isinstance(result, Exception): + log.warning( + "Sub-batch failed (%s: %s), retrying individually", + type(result).__name__, result, + ) + for payload, future in chunk: + self._spawn(self._relay_fallback(payload, future)) + else: + items_result, dt = result + max_dt = max(max_dt, dt) + if dt > 8.0: + log.warning( + "Slow sub-batch: %.1fs for %d items — DPI shaping?", + dt, len(chunk), + ) + for (_, future), raw in zip(chunk, items_result): + if not future.done(): + future.set_result(raw) + if max_dt > 0: + log.debug("Batch wall-clock: %.1fs", max_dt) + return + + # Single-batch path: H2 unavailable or only one connection live. + log.info("Batch relay: %d requests", len(batch)) + try: + results = await self._relay_batch([p for p, _ in batch]) + for (_, future), result in zip(batch, results): + if not future.done(): + future.set_result(result) + except Exception as e: + log.warning( + "Batch relay failed, disabling batch mode for %ds cooldown. " + "Error: %s: %s", + self._batch_cooldown, type(e).__name__, e or "(no details)", + ) + self._batch_enabled = False + self._batch_disabled_at = time.time() + # Fallback: send individually + tasks = [] + for payload, future in batch: + tasks.append(self._relay_fallback(payload, future)) + await asyncio.gather(*tasks) async def _relay_fallback(self, payload, future): """Fallback: relay a single request from a failed batch.""" @@ -2041,7 +2304,9 @@ class DomainFronter: self._record_h2_success() return result except Exception as e: - self._record_h2_failure(e) + if self._is_h2_transport_error(e): + self._record_h2_failure(e) + self._spawn(self._reconnect_pool_members()) log.debug("Fan-out relay failed (%s), falling back", e) # fall through to single-path logic below @@ -2055,24 +2320,20 @@ class DomainFronter: self._record_h2_success() return result except Exception as e: - self._record_h2_failure(e) - if attempt < attempts - 1: - log.debug("H2 relay failed (%s), reconnecting", e) - try: - await self._h2.reconnect() - # Do NOT record success here — only a successful relay - # response proves the connection works. Recording - # success after reconnect was resetting the failure - # streak and causing an infinite reconnect storm. - except Exception as reconnect_exc: - self._record_h2_failure(reconnect_exc) - log.warning("H2 reconnect failed, falling back to H1") - break + is_transport = self._is_h2_transport_error(e) + if is_transport: + self._record_h2_failure(e) + # Spawn background reconnect for any newly-dead transports + # so future requests find healthy connections. + self._spawn(self._reconnect_pool_members()) + if attempt < attempts - 1 and self._h2_available(): + log.debug("H2 relay attempt %d failed (%s: %s), retrying", + attempt + 1, type(e).__name__, e) else: - # Last H2 attempt failed — fall through to H1 rather - # than raising here, which would bypass H1 entirely. - log.debug("H2 relay failed on final attempt (%s), " - "falling back to H1", e) + log.debug( + "H2 relay failed (%s: %s), falling back to H1", + type(e).__name__, e, + ) break # HTTP/1.1 fallback (pool-based) @@ -2141,8 +2402,8 @@ class DomainFronter: async def _relay_single_h2(self, payload: dict) -> bytes: """Execute a relay through HTTP/2 multiplexing. - Uses the shared H2 connection — no pool checkout needed. - Many concurrent calls all share one TLS connection. + Picks a connection from the pool via round-robin so each request + benefits from its own DPI token bucket. """ full_payload = dict(payload) full_payload["k"] = self.auth_key @@ -2152,7 +2413,7 @@ class DomainFronter: path = self._exec_path_for_sid(sid) self._record_execution(sid) - status, headers, body = await self._h2.request( + status, headers, body = await (self._pick_h2() or self._h2).request( method="POST", path=path, host=self.http_host, headers={"content-type": "application/json"}, body=json_body, @@ -2174,7 +2435,7 @@ class DomainFronter: path = self._exec_path_for_sid(sid) self._record_execution(sid) - status, headers, body = await self._h2.request( + status, headers, body = await (self._pick_h2() or self._h2).request( method="POST", path=path, host=self.http_host, headers={"content-type": "application/json"}, body=json_body, @@ -2285,7 +2546,7 @@ class DomainFronter: try: self._record_execution(sid) status, headers, body = await asyncio.wait_for( - self._h2.request( + (self._pick_h2() or self._h2).request( method="POST", path=path, host=self.http_host, headers={"content-type": "application/json"}, body=json_body, @@ -2295,7 +2556,9 @@ class DomainFronter: self._record_h2_success() return self._parse_batch_body(body, payloads) except Exception as e: - self._record_h2_failure(e) + if self._is_h2_transport_error(e): + self._record_h2_failure(e) + self._spawn(self._reconnect_pool_members()) log.debug("H2 batch failed (%s), falling back to H1", e) # HTTP/1.1 fallback diff --git a/src/relay/h2_transport.py b/src/relay/h2_transport.py index 00134d5..77b7e0c 100644 --- a/src/relay/h2_transport.py +++ b/src/relay/h2_transport.py @@ -68,10 +68,23 @@ class H2Transport: def __init__(self, connect_host: str, sni_host: str, verify_ssl: bool = True, - sni_hosts: list[str] | None = None): + sni_hosts: list[str] | None = None, + no_sni: bool = False, + ping_interval: float = 0.2): self.connect_host = connect_host self.sni_host = sni_host self.verify_ssl = verify_ssl + # no_sni=True: omit the SNI extension from TLS ClientHello entirely. + # DPI cannot match a hostname → may bypass per-SNI throttle rules. + # Google's GFE accepts SNI-less connections (returns default cert). + # Requires verify_ssl=False since hostname won't be in default cert CN. + self._no_sni: bool = no_sni + # ping_interval: seconds between H2 PING frames sent while waiting + # for a relay response. Fills the TCP silence that Iran DPI uses to + # classify and throttle Apps Script relay connections. Server MUST + # reply with PING ACK → continuous bidirectional flow visible to DPI. + # Set to 0 to disable. Default 0.2s (5 pings/s during GAS execution). + self._ping_interval: float = ping_interval # Optional SNI rotation pool — picked round-robin on each new connect. # Falls back to the single sni_host if no pool is given. self._sni_hosts: list[str] = [h for h in (sni_hosts or []) if h] or [sni_host] @@ -132,6 +145,13 @@ class H2Transport: self._sni_idx += 1 self.sni_host = sni # kept for backward-compat logging + # no_sni mode: omit SNI extension entirely so DPI can't classify by + # hostname. Forces cert-verification off (default cert won't match). + if self._no_sni: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + sni = None # server_hostname=None → no SNI extension sent + # Create raw TCP socket with TCP_NODELAY BEFORE TLS handshake. # Nagle's algorithm can delay small writes (H2 frames) by up to 200ms # waiting to coalesce — TCP_NODELAY forces immediate send. @@ -320,14 +340,28 @@ class H2Transport: await self._flush() - # Wait for complete response + # Wait for complete response, sending H2 PING frames concurrently. + # The PING/ACK bidirectional traffic masks the Apps Script execution + # silence that Iran DPI uses to identify and throttle relay patterns. + ping_task = None + if self._ping_interval > 0: + ping_task = asyncio.create_task( + self._ping_keepalive(state, self._ping_interval) + ) try: await asyncio.wait_for(state.done.wait(), timeout=timeout) except asyncio.TimeoutError: + if ping_task: + ping_task.cancel() + await asyncio.gather(ping_task, return_exceptions=True) self._streams.pop(stream_id, None) raise TimeoutError( f"H2 stream {stream_id} timed out ({timeout}s)" ) + finally: + if ping_task: + ping_task.cancel() + await asyncio.gather(ping_task, return_exceptions=True) self._streams.pop(stream_id, None) @@ -342,6 +376,32 @@ class H2Transport: return state.status, state.headers, resp_body + async def _ping_keepalive(self, state: "_StreamState", interval: float): + """Send H2 PING frames at *interval* seconds until *state* completes. + + Iran DPI throttles relay responses when it detects a long TCP silence + between the client's POST upload and the server's first response byte + (Apps Script typically takes 2-5 s to execute). Sending PING frames + forces the server to emit PING ACK frames, creating continuous + bidirectional traffic that looks like a normal persistent connection + rather than a relay waiting for proxy execution. + """ + opaque = b"dpi-ping" # 8-byte PING opaque data + try: + while not state.done.is_set(): + await asyncio.sleep(interval) + if state.done.is_set(): + break + try: + async with self._write_lock: + if self._h2 and self._connected: + self._h2.ping(opaque) + await self._flush() + except Exception: + break # connection lost; let the main wait() handle it + except asyncio.CancelledError: + pass + def _send_body(self, stream_id: int, body: bytes): """Send request body, respecting H2 flow control window. diff --git a/src/relay/relay_response.py b/src/relay/relay_response.py index 1260fe6..35ad87f 100644 --- a/src/relay/relay_response.py +++ b/src/relay/relay_response.py @@ -21,7 +21,6 @@ classify_relay_error(raw) -> str """ import base64 -import codecs import gzip import json import logging @@ -233,6 +232,15 @@ def parse_relay_json(data: dict, max_body_bytes: int) -> bytes: resp_headers = data.get("h", {}) resp_body = base64.b64decode(data.get("b", "")) + # Decompress relay-level gzip applied by Code.gs to shrink bytes-on-wire + # over DPI-shaped connections. The "gz" flag is set when Code.gs found + # that gzip saved space (all text content: JS, CSS, HTML, JSON). + if data.get("gz"): + try: + resp_body = gzip.decompress(resp_body) + except Exception as _exc: + log.debug("relay gz decompress failed: %s", _exc) + # ── Decompress if the target sent a compressed body ───────────────────────── # UrlFetchApp does NOT auto-decompress gzip/deflate responses, so if the # client's Accept-Encoding header was forwarded and the server compressed @@ -304,7 +312,13 @@ def parse_relay_json(data: dict, max_body_bytes: int) -> bytes: def extract_apps_script_user_html(text: str) -> str | None: - """Extract embedded user HTML from an Apps Script HTML-page response.""" + """Extract embedded user HTML from an Apps Script HTML-page response. + + Google's IFRAME_SANDBOX mode returns /exec responses wrapped in an HTML + page that includes a goog.script.init("...") call. The first argument is + a JS string literal (\\xNN hex escapes) containing a JSON payload with + a ``userHtml`` field that holds the actual relay response. + """ marker = 'goog.script.init("' start = text.find(marker) if start == -1: @@ -316,7 +330,17 @@ def extract_apps_script_user_html(text: str) -> str | None: encoded = text[start:end] try: - decoded = codecs.decode(encoded, "unicode_escape") + # The JS string uses \xNN hex escapes and \/ for forward-slash. + # Also unescape \\ → \ (JS double-backslash = literal backslash). + # Order: hex first, then double-backslash, then \/ so that + # \\/ (JS for literal-backslash + /) works correctly. + decoded = re.sub( + r'\\x([0-9a-fA-F]{2})', + lambda m: chr(int(m.group(1), 16)), + encoded, + ) + decoded = decoded.replace("\\\\", "\\") + decoded = decoded.replace("\\/", "/") payload = json.loads(decoded) except Exception: return None