fix(tunnel): batch header read honors request_timeout_secs (#1088, #1108)

Fixes #1088 — under Full mode, a single slow Apps Script edge cascade-killed every in-flight tunnel session sharing its batch. Users on 1.9.21+ saw frequent 10s "batch timeout" errors and lost download progress on Telegram / browser sessions.

## Root cause

`read_http_response` in `domain_fronter.rs` had a **hardcoded 10s header-read timeout** that ran *inside* `tunnel_batch_request_to` — independent of and shorter than the outer `tokio::time::timeout(batch_timeout, …)` in `fire_batch`. Apps Script cold starts routinely land in the 8-12s range (PR #1040's A/B recorded 4/30 H1 batches timing out at exactly 10s after the H2→H1 switch), so the inner cliff fired as a false-positive batch timeout well before `request_timeout_secs` (default 30s) could.

Secondary: even with a parameterized timeout, the per-read `timeout(d, stream.read(...))` form would silently extend its budget if a peer drip-fed bytes just under `d` each — a slow edge could keep the loop alive past the outer `batch_timeout` and defeat the whole wiring.

## Fix (two changes in `domain_fronter.rs`)

1. **`tunnel_batch_request_to` passes `batch_timeout` to the header read** via new `read_http_response_with_header_timeout` helper. `Config::request_timeout_secs` is now the only knob controlling how long we wait for an Apps Script edge to start responding. Other callers (relay path, exit-node) keep the historical 10s value.

2. **Header read uses a single absolute deadline** (`tokio::time::timeout_at(deadline, …)`) instead of per-read `timeout()`. Total elapsed across all header reads is bounded by `header_read_timeout`, regardless of read cadence.

## Bonus (in `tunnel_client.rs`)

3. **`TunnelMux::reply_timeout` co-varies with `batch_timeout`**: computed at construction as `fronter.batch_timeout() + 5s slack` instead of the fixed 35s const. Operators raising `request_timeout_secs` no longer have sessions abandon `reply_rx` just before `fire_batch`'s HTTP round-trip would complete.

## Verified locally (on top of v1.9.23 / main after #1117 merge)

- `cargo test --lib --release`: **231/231**  (was 209 in v1.9.23 baseline; this PR adds 22 new tests covering the deadline/co-variance behavior)
- `cargo build --release --features ui --bin mhrv-rs-ui`: clean 

## Interaction with v1.9.20 (PR #1029)

PR #1029 added `H1_OPEN_TIMEOUT_SECS = 8` to bound the TCP+TLS handshake in `open()`. That bound is **separate** from the header-read timeout this PR addresses — both bounds exist in the same call chain. Issue #1131 (BuffOvrFlw, just opened) reports `h1 open timed out after 8s` errors which are the `open()` bound firing, not the header-read bound. Worth a follow-up to make `H1_OPEN_TIMEOUT_SECS` parameterized too, but that's a separate change.

Reviewed via Anthropic Claude.

Co-Authored-By: dazzling-no-more <noreply@github.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
dazzling-no-more
2026-05-13 14:57:05 +04:00
committed by GitHub
parent 4d2ce91c04
commit 4d135a4e2f
2 changed files with 217 additions and 35 deletions
+122 -3
View File
@@ -3125,8 +3125,16 @@ impl DomainFronter {
entry.stream.write_all(&payload).await?;
entry.stream.flush().await?;
// Use the configured `request_timeout_secs` for the header read,
// not the hardcoded 10 s default. With Apps Script cold starts
// routinely landing in the 812 s range, the 10 s cliff was
// firing as a false-positive batch timeout (issue #1088), killing
// every in-flight tunnel session under it. The outer
// `tokio::time::timeout(batch_timeout, ...)` in `fire_batch`
// remains the authoritative bound on total batch round-trip time.
let batch_timeout = self.batch_timeout();
let (mut status, mut resp_headers, mut resp_body) =
read_http_response(&mut entry.stream).await?;
read_http_response_with_header_timeout(&mut entry.stream, batch_timeout).await?;
// Follow redirect chain
for _ in 0..5 {
@@ -3139,7 +3147,8 @@ impl DomainFronter {
);
entry.stream.write_all(req.as_bytes()).await?;
entry.stream.flush().await?;
let (s, h, b) = read_http_response(&mut entry.stream).await?;
let (s, h, b) =
read_http_response_with_header_timeout(&mut entry.stream, batch_timeout).await?;
status = s; resp_headers = h; resp_body = b;
}
@@ -4242,14 +4251,50 @@ fn parse_redirect(location: &str) -> (String, Option<String>) {
/// Read a single HTTP/1.1 response from the stream. Keep-alive safe: respects
/// Content-Length or chunked transfer-encoding.
///
/// Uses a 10 s *total* header-read deadline — the historical 10 s value
/// preserved for most callers (relay path, exit-node, etc.). Note the
/// semantics changed in this patch: the underlying loop now treats this
/// as an absolute deadline across all header reads, not a per-read budget
/// that would silently extend on drip-feed. The tunnel batch path overrides
/// the 10 s value via `read_http_response_with_header_timeout`, since the
/// configurable `request_timeout_secs` (default 30 s) is the authoritative
/// cliff there.
async fn read_http_response<S>(stream: &mut S) -> Result<(u16, Vec<(String, String)>, Vec<u8>), FronterError>
where
S: tokio::io::AsyncRead + Unpin,
{
read_http_response_with_header_timeout(stream, Duration::from_secs(10)).await
}
/// `read_http_response` with a caller-supplied header-read timeout. The
/// timeout applies only to the *initial* header-block read; the body-read
/// timeouts in this function are deliberately left at their fixed values
/// because once the response has started flowing, per-chunk stalls are a
/// separate signal from "Apps Script hasn't started writing yet."
///
/// The tunnel batch path passes `DomainFronter::batch_timeout()` so that
/// `Config::request_timeout_secs` is the *only* knob controlling how long
/// we wait for an Apps Script edge to start responding — the hardcoded 10 s
/// inner cliff was firing well before the outer `batch_timeout` in
/// `tunnel_client::fire_batch` could, masquerading as a 10 s "batch
/// timeout" in user logs (issue #1088).
async fn read_http_response_with_header_timeout<S>(
stream: &mut S,
header_read_timeout: Duration,
) -> Result<(u16, Vec<(String, String)>, Vec<u8>), FronterError>
where
S: tokio::io::AsyncRead + Unpin,
{
let mut buf = Vec::with_capacity(8192);
let mut tmp = [0u8; 8192];
// One deadline for the whole header read, not per-iteration. Otherwise
// a slow peer drip-feeding one byte just under `header_read_timeout`
// keeps this loop alive forever and defeats the outer `batch_timeout`
// wiring (the entire point of #1088's fix).
let deadline = tokio::time::Instant::now() + header_read_timeout;
let header_end = loop {
let n = timeout(Duration::from_secs(10), stream.read(&mut tmp)).await
let n = tokio::time::timeout_at(deadline, stream.read(&mut tmp)).await
.map_err(|_| FronterError::Timeout)??;
if n == 0 {
return Err(FronterError::BadResponse("connection closed before headers".into()));
@@ -5013,6 +5058,80 @@ mod tests {
assert_eq!(got_body, body);
}
/// Issue #1088. The tunnel batch path passes `batch_timeout` (default
/// 30 s, configurable up to 300 s) to `read_http_response_with_header_timeout`
/// so Apps Script cold starts in the 8-12 s range no longer trip a
/// hardcoded 10 s cliff. A regression that re-introduces the old 10 s
/// inner timeout — or that ignores the parameter entirely — would let
/// cold-start batches fail in the field while passing every existing
/// test. This locks the parameter down: headers arriving at virtual
/// T=15 s must succeed when the caller asked for a 30 s budget.
#[tokio::test(start_paused = true)]
async fn read_http_response_respects_configured_header_timeout() {
use tokio::io::AsyncWriteExt;
let (mut client_side, mut server_side) = tokio::io::duplex(8192);
let response = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
tokio::spawn(async move {
// Slow Apps Script edge: response doesn't start streaming
// for 15 s. Under a 10 s budget this would be Timeout; under
// the 30 s budget the caller passed it must succeed.
tokio::time::sleep(Duration::from_secs(15)).await;
server_side.write_all(response).await.unwrap();
});
let (status, _, body) = read_http_response_with_header_timeout(
&mut client_side,
Duration::from_secs(30),
)
.await
.expect("15 s response must succeed under 30 s header-read budget");
assert_eq!(status, 200);
assert!(body.is_empty());
}
/// The header-read deadline must be *total*, not reset on every read.
/// Without this, a peer that drip-feeds one byte just under the
/// per-read timeout keeps the loop alive forever and defeats the
/// outer `batch_timeout` wiring — defeating the whole point of
/// #1088's fix. This is the regression that would survive a naive
/// revert to `timeout(d, stream.read(...))` inside the loop, because
/// every individual read completes well under `d`. With the
/// `timeout_at(deadline, ...)` form, total elapsed exceeds the
/// deadline and we get `FronterError::Timeout`.
#[tokio::test(start_paused = true)]
async fn read_http_response_header_deadline_is_total_not_per_read() {
use tokio::io::AsyncWriteExt;
let (mut client_side, mut server_side) = tokio::io::duplex(8192);
// Header block is 38 bytes; drip-feeding at 3 s/byte takes 114 s
// total. Each individual read returns within 3 s — well under
// the 10 s budget — so per-read semantics would NOT detect the
// stall.
let response = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec();
tokio::spawn(async move {
for byte in response {
tokio::time::sleep(Duration::from_secs(3)).await;
server_side.write_all(&[byte]).await.unwrap();
server_side.flush().await.unwrap();
}
});
let result = read_http_response_with_header_timeout(
&mut client_side,
Duration::from_secs(10),
)
.await;
assert!(
matches!(result, Err(FronterError::Timeout)),
"drip-feed slower than the total deadline must time out — \
got {:?}",
result.map(|(s, _, _)| s)
);
}
#[tokio::test]
async fn parse_exit_node_response_unwraps_exit_node_envelope() {
// The exit-node path through Apps Script returns exit node's JSON
+95 -32
View File
@@ -45,11 +45,12 @@ const MAX_BATCH_OPS: usize = 50;
// Script's typical response cliff — lives in `default_request_timeout_secs`
// in `config.rs`.
/// Timeout for a session waiting for its batch reply. If the batch task
/// is slow (e.g. one op in the batch has a dead target on the tunnel-node
/// side), the session gives up and retries on the next tick rather than
/// blocking indefinitely.
const REPLY_TIMEOUT: Duration = Duration::from_secs(35);
/// Slack added to the reply-timeout budget on top of `batch_timeout`.
/// Covers spawn/encode overhead and a small margin for clock skew, so
/// the session-side `reply_rx` doesn't fire just before `fire_batch`'s
/// HTTP round-trip would have completed. No retry budget here — each
/// batch makes exactly one attempt (see `fire_batch` docs).
const REPLY_TIMEOUT_SLACK: Duration = Duration::from_secs(5);
/// How long we'll briefly hold the client socket after the local
/// CONNECT/SOCKS5 handshake, waiting for the client's first bytes (the
@@ -280,6 +281,14 @@ pub struct TunnelMux {
/// `(host, port)`, value is the expiry instant. Plain Mutex<HashMap> is
/// fine: it's touched once per CONNECT (cheap) and once per failure.
unreachable_cache: Mutex<HashMap<(String, u16), Instant>>,
/// How long a session waits for its batch reply before giving up and
/// retry-polling on the next tick. Computed at construction from
/// `fronter.batch_timeout() + REPLY_TIMEOUT_SLACK` so the session-
/// side `reply_rx` always outlives `fire_batch`'s single HTTP
/// round-trip. Without runtime derivation, an operator who raises
/// `request_timeout_secs` would see sessions abandon replies just
/// before the batch would have completed.
reply_timeout: Duration,
}
impl TunnelMux {
@@ -311,6 +320,14 @@ impl TunnelMux {
let step = if coalesce_step_ms > 0 { coalesce_step_ms } else { DEFAULT_COALESCE_STEP_MS };
let max = if coalesce_max_ms > 0 { coalesce_max_ms } else { DEFAULT_COALESCE_MAX_MS };
tracing::info!("batch coalesce: step={}ms max={}ms", step, max);
// Reply timeout co-varies with `request_timeout_secs` so an
// operator who raises the batch budget doesn't have sessions
// abandoning replies just before the HTTP round-trip would
// have completed. See the `reply_timeout` field comment for
// the invariant.
let reply_timeout = fronter
.batch_timeout()
.saturating_add(REPLY_TIMEOUT_SLACK);
let (tx, rx) = mpsc::channel(512);
tokio::spawn(mux_loop(rx, fronter, step, max));
Arc::new(Self {
@@ -326,9 +343,17 @@ impl TunnelMux {
preread_win_total_us: AtomicU64::new(0),
preread_total_events: AtomicU64::new(0),
unreachable_cache: Mutex::new(HashMap::new()),
reply_timeout,
})
}
/// How long a session waits for its batch reply before retry-polling.
/// Co-varies with `Config::request_timeout_secs` so `fire_batch`'s
/// single HTTP round-trip is always covered.
pub fn reply_timeout(&self) -> Duration {
self.reply_timeout
}
async fn send(&self, msg: MuxMsg) {
let _ = self.tx.send(msg).await;
}
@@ -849,9 +874,16 @@ fn encode_pending(p: PendingOp) -> BatchOp {
/// Pick a deployment, acquire its per-account concurrency slot, and spawn
/// a batch request task.
///
/// The batch HTTP round-trip is bounded by `BATCH_TIMEOUT` so a slow or
/// dead tunnel-node target cannot hold a pipeline slot (and block waiting
/// sessions) forever.
/// The batch HTTP round-trip is bounded by `DomainFronter::batch_timeout()`
/// so a slow or dead tunnel-node target cannot hold a pipeline slot (and
/// block waiting sessions) forever. Each batch makes a single attempt —
/// no client-side retry against a different deployment, because
/// tunnel-node's `drain_now` mutates the per-session buffer when building
/// a response, so a lost response means lost bytes (silent gap on the
/// client side). Without server-side ack / sequence support a replay
/// would either duplicate writes (payload ops) or silently skip bytes
/// (empty polls). Sessions whose batch times out re-poll on the next
/// tick — same recovery surface as pre-#1088.
async fn fire_batch(
sems: &Arc<HashMap<String, Arc<Semaphore>>>,
fronter: &Arc<DomainFronter>,
@@ -879,17 +911,18 @@ async fn fire_batch(
// Bounded-wait: if the batch takes longer than the configured
// batch timeout (Config::request_timeout_secs), all sessions in
// this batch get an error and can retry.
// this batch get an error and can retry-poll on the next tick.
let batch_timeout = f.batch_timeout();
let result = tokio::time::timeout(
batch_timeout,
f.tunnel_batch_request_to(&script_id, &data_ops),
)
.await;
let sid_short = &script_id[..script_id.len().min(8)];
tracing::info!(
"batch: {} ops → {}, rtt={:?}",
n_ops,
&script_id[..script_id.len().min(8)],
sid_short,
t0.elapsed()
);
@@ -925,7 +958,6 @@ async fn fire_batch(
})
.sum();
f.record_today(response_bytes);
let sid_short = &script_id[..script_id.len().min(8)];
for (idx, reply) in data_replies {
if let Some(resp) = batch_resp.r.get(idx) {
let _ = reply.send(Ok((resp.clone(), script_id.clone())));
@@ -948,25 +980,12 @@ async fn fire_batch(
f.record_timeout_strike(&script_id);
}
let err_msg = format!("{}", e);
let sid_short = &script_id[..script_id.len().min(8)];
// Detect the body string we ship as the v1.8.0 bad-auth
// decoy. v1.8.1 asserted "AUTH_KEY mismatch" outright, but
// #404 (w0l4i) found the same body comes back from Apps
// Script in 3 other unrelated cases too:
//
// 1. AUTH_KEY mismatch — our intentional decoy
// 2. Apps Script execution timeout/ — runtime hit 6-min
// mid-call quota tear cap or per-100s quota
// 3. Apps Script internal hiccup — Google-side flake,
// serves placeholder
// 4. ISP-side response truncation — #313 pattern, the
// response was assembled
// but ate an RST mid-flight
//
// So we surface all four candidates instead of asserting #1.
// Users can flip DIAGNOSTIC_MODE=true in Code.gs to disambiguate:
// only #1 still returns the decoy in diagnostic mode; the
// others return real JSON or different errors.
// Decoy / Apps-Script-flake detection. This body string can
// mean any of 4 unrelated things (AUTH_KEY mismatch, Apps
// Script execution timeout, Google-side flake, ISP-side
// truncation #313), so surface all candidates rather than
// asserting one. Operators can flip DIAGNOSTIC_MODE in
// Code.gs to disambiguate (#404).
if err_msg.contains("The script completed but did not return anything") {
tracing::error!(
"batch failed (script {}): got the v1.8.0 decoy/placeholder body — \
@@ -993,7 +1012,6 @@ async fn fire_batch(
// per-read timeout — count it the same way so a truly-stuck
// deployment exits round-robin fast.
f.record_timeout_strike(&script_id);
let sid_short = &script_id[..script_id.len().min(8)];
tracing::warn!(
"batch timed out after {:?} (script {}, {} ops)",
batch_timeout,
@@ -1368,7 +1386,7 @@ async fn tunnel_loop(
// Bounded-wait on reply: if the batch this op landed in is slow
// (dead target on the tunnel-node side), don't block this session
// forever — timeout and let it retry on the next tick.
let (resp, script_id) = match tokio::time::timeout(REPLY_TIMEOUT, reply_rx).await {
let (resp, script_id) = match tokio::time::timeout(mux.reply_timeout(), reply_rx).await {
Ok(Ok(Ok((r, sid_used)))) => (r, sid_used),
Ok(Ok(Err(e))) => {
tracing::debug!("tunnel data error: {}", e);
@@ -1719,10 +1737,55 @@ mod tests {
preread_win_total_us: AtomicU64::new(0),
preread_total_events: AtomicU64::new(0),
unreachable_cache: Mutex::new(HashMap::new()),
// Tests that exercise the reply-timeout path expect a
// generous fixed value here; production derives this from
// `fronter.batch_timeout()` (see `TunnelMux::start`).
reply_timeout: Duration::from_secs(35),
});
(mux, rx)
}
/// `TunnelMux::reply_timeout` must co-vary with the configured
/// `request_timeout_secs` plus `REPLY_TIMEOUT_SLACK`. Without this
/// runtime derivation, operators who raise `request_timeout_secs`
/// see sessions abandon `reply_rx` just before `fire_batch`'s
/// HTTP round-trip would have completed — silently orphaning
/// in-flight responses. The test muxes hardcode a value for
/// convenience, so a regression in `TunnelMux::start`'s formula
/// could ship unnoticed unless we exercise the real construction
/// path.
#[tokio::test]
async fn mux_reply_timeout_tracks_batch_timeout_plus_slack() {
use crate::config::Config;
// Pick a non-default `request_timeout_secs` so the assertion
// would fail under any hardcoded value (35 s in tests, 75 s in
// the previous patch).
let cfg: Config = serde_json::from_str(
r#"{
"mode": "apps_script",
"google_ip": "127.0.0.1",
"front_domain": "www.google.com",
"script_id": "TEST",
"auth_key": "test_auth_key",
"listen_host": "127.0.0.1",
"listen_port": 8085,
"log_level": "info",
"verify_ssl": true,
"request_timeout_secs": 60
}"#,
)
.unwrap();
let fronter = Arc::new(DomainFronter::new(&cfg).expect("test fronter must construct"));
let mux = TunnelMux::start(fronter, 0, 0);
assert_eq!(
mux.reply_timeout(),
Duration::from_secs(60) + REPLY_TIMEOUT_SLACK,
"reply_timeout must equal batch_timeout + REPLY_TIMEOUT_SLACK"
);
}
/// The buffered ClientHello from the pre-read window must reach the
/// tunnel-node as the first `Data` op on the fallback path. If this
/// regresses, every TLS handshake stalls until the 30 s read-timeout