fix: v1.9.16 — Full mode 50 MiB batch-response truncation (#863)

Apps Script's response body cap is ~50 MiB. tunnel-node had a TCP_DRAIN_MAX_BYTES = 16 MiB per-session cap to stay under it, but multiple sessions in the same batch each contributed up to 16 MiB raw, summing past 50 MiB on busy VPS — N≥4 concurrent sessions × 16 MiB → ≥64 MiB raw → ≥85 MiB after base64. Steam updates and other CDN-served large downloads hit this exactly: `EOF while parsing a string at line 1 column 52428630` from the client and the session aborts mid-stream.

Fix: new BATCH_RESPONSE_BUDGET = 32 MiB total-batch cap. Drain loop tracks remaining budget across sessions and stops one short of the cliff. drain_now() now takes max_bytes; effective cap = min(budget, TCP_DRAIN_MAX_BYTES). Sessions deferred this batch keep their buffered data — no data loss, they drain on the next poll.

Single-op-path callers and existing tests pass usize::MAX (no extra constraint, original TCP_DRAIN_MAX_BYTES still enforced). New regression test `drain_now_respects_caller_budget_below_per_session_cap` covers the new behavior.

Tests: 197 lib + 36 tunnel-node (was 35) all green. UI release build green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
therealaleph
2026-05-07 19:25:50 +03:00
parent 82a8cbfb84
commit 2c9c693d13
4 changed files with 85 additions and 21 deletions
+79 -19
View File
@@ -99,6 +99,19 @@ const UDP_RECV_BUF_BYTES: usize = 65536;
/// under the cap and let throughput recover across batches.
const TCP_DRAIN_MAX_BYTES: usize = 16 * 1024 * 1024;
/// Hard cap on the total raw bytes drained across **all sessions** in a
/// single batch response. The per-session cap (`TCP_DRAIN_MAX_BYTES`)
/// alone isn't enough — N concurrent sessions can each contribute up to
/// 16 MiB raw; with N≥4, the summed batch body exceeds Apps Script's
/// 50 MiB ceiling and the client fails JSON parse mid-stream (#863).
///
/// 32 MiB raw → ~43 MiB base64 + per-session JSON envelope overhead
/// (~80 bytes × ≤50 ops cap) → comfortably under 50 MiB total. Any
/// further sessions in the same batch are deferred to the next poll
/// (their data stays in their per-session `read_buf`, so no data loss
/// — they just settle one batch later).
const BATCH_RESPONSE_BUDGET: usize = 32 * 1024 * 1024;
/// First queue-drop on a session always logs at warn level; subsequent
/// drops log at debug only every Nth occurrence so a single congested
/// session can't flood the operator's log.
@@ -340,27 +353,32 @@ async fn udp_reader_task(socket: Arc<UdpSocket>, session: Arc<UdpSessionInner>)
}
}
/// Drain up to `TCP_DRAIN_MAX_BYTES` from the per-session read buffer —
/// no waiting. Used by batch mode where we poll frequently.
/// Drain up to `min(TCP_DRAIN_MAX_BYTES, max_bytes)` from the per-session
/// read buffer — no waiting. Used by batch mode where we poll frequently.
///
/// If the buffer is larger than the cap, we return a prefix of the
/// data and leave the remainder in the buffer for the next poll. The
/// cap exists to keep batch responses under Apps Script's ~50 MiB body
/// ceiling on high-bandwidth VPS — see `TCP_DRAIN_MAX_BYTES` for the
/// underlying issue (#460).
/// `max_bytes` is the caller-supplied budget for this drain (typically the
/// remaining batch-response budget after summing previous drains in the
/// same batch). It allows the batch loop to stop one session short of
/// blowing past Apps Script's 50 MiB ceiling on the wire (#863). Pass
/// `usize::MAX` if there's no extra budget constraint (e.g. single-op
/// path outside the batch loop).
///
/// `eof` is reported as true only when the buffer has been fully
/// drained AND upstream has signaled EOF — otherwise a partial drain
/// would prematurely tear the session down on the client side.
async fn drain_now(session: &SessionInner) -> (Vec<u8>, bool) {
/// If the buffer is larger than the effective cap, we return a prefix of
/// the data and leave the remainder in the buffer for the next poll.
///
/// `eof` is reported as true only when the buffer has been fully drained
/// AND upstream has signaled EOF — otherwise a partial drain would
/// prematurely tear the session down on the client side.
async fn drain_now(session: &SessionInner, max_bytes: usize) -> (Vec<u8>, bool) {
let mut buf = session.read_buf.lock().await;
let raw_eof = session.eof.load(Ordering::Acquire);
if buf.len() <= TCP_DRAIN_MAX_BYTES {
let cap = max_bytes.min(TCP_DRAIN_MAX_BYTES);
if buf.len() <= cap {
let data = std::mem::take(&mut *buf);
(data, raw_eof)
} else {
// Take the prefix; leave the tail in the buffer.
let tail = buf.split_off(TCP_DRAIN_MAX_BYTES);
let tail = buf.split_off(cap);
let head = std::mem::replace(&mut *buf, tail);
// Don't propagate eof yet — buffer still has data even if upstream
// has closed. The client will get eof on the drain that returns
@@ -1062,12 +1080,25 @@ async fn handle_batch(
// session and abort the reader_task with the tail still
// buffered, dropping those bytes.
let mut tcp_eof_sids: Vec<String> = Vec::new();
// Track remaining batch-response budget across all session drains
// (#863). Per-session `TCP_DRAIN_MAX_BYTES` alone wasn't enough —
// several concurrent sessions each contributing 16 MiB summed past
// Apps Script's 50 MiB response ceiling. This cap stops one session
// short of the cliff; deferred sessions drain on the next poll.
let mut remaining_budget: usize = BATCH_RESPONSE_BUDGET;
for (i, sid, inner) in &tcp_drains {
let (data, eof) = drain_now(inner).await;
let (data, eof) = drain_now(inner, remaining_budget).await;
let drained = data.len();
if eof {
tcp_eof_sids.push(sid.clone());
}
results.push((*i, tcp_drain_response(sid.clone(), data, eof)));
remaining_budget = remaining_budget.saturating_sub(drained);
if remaining_budget == 0 {
// Budget exhausted; remaining sessions in `tcp_drains` keep
// their buffered data and pick up next batch.
break;
}
}
if !tcp_eof_sids.is_empty() {
let mut sessions = state.sessions.lock().await;
@@ -1718,24 +1749,53 @@ mod tests {
let oversized = TCP_DRAIN_MAX_BYTES + 4096;
inner.read_buf.lock().await.resize(oversized, 0xab);
let (first, eof) = drain_now(&inner).await;
let (first, eof) = drain_now(&inner, usize::MAX).await;
assert_eq!(first.len(), TCP_DRAIN_MAX_BYTES);
assert!(!eof, "shouldn't propagate eof while buffer still has data");
// Tail remains for the next poll.
assert_eq!(inner.read_buf.lock().await.len(), 4096);
let (second, _) = drain_now(&inner).await;
let (second, _) = drain_now(&inner, usize::MAX).await;
assert_eq!(second.len(), 4096);
assert!(inner.read_buf.lock().await.is_empty());
}
#[tokio::test]
async fn drain_now_respects_caller_budget_below_per_session_cap() {
// Issue #863: per-session TCP_DRAIN_MAX_BYTES alone wasn't enough
// because N sessions × 16 MiB summed past Apps Script's 50 MiB
// response ceiling. The batch loop now passes a remaining-budget
// cap; drain_now must honor `min(budget, TCP_DRAIN_MAX_BYTES)`,
// leaving the tail for the next poll exactly like the per-session
// cap path does.
let inner = fake_inner().await;
// 1 MiB buffered, but caller only has 256 KiB budget left.
inner
.read_buf
.lock()
.await
.resize(1024 * 1024, 0xcd);
let (drained, eof) = drain_now(&inner, 256 * 1024).await;
assert_eq!(drained.len(), 256 * 1024);
assert!(!eof, "tail still buffered, eof must wait");
// The remaining 768 KiB stays put for the next poll.
assert_eq!(inner.read_buf.lock().await.len(), 768 * 1024);
// Next call with full budget drains the rest.
let (rest, _) = drain_now(&inner, usize::MAX).await;
assert_eq!(rest.len(), 768 * 1024);
assert!(inner.read_buf.lock().await.is_empty());
}
#[tokio::test]
async fn drain_now_passes_through_when_under_cap() {
let inner = fake_inner().await;
inner.read_buf.lock().await.extend_from_slice(b"hello world");
let (data, eof) = drain_now(&inner).await;
let (data, eof) = drain_now(&inner, usize::MAX).await;
assert_eq!(data, b"hello world");
assert!(!eof);
assert!(inner.read_buf.lock().await.is_empty());
@@ -1754,11 +1814,11 @@ mod tests {
.await
.resize(TCP_DRAIN_MAX_BYTES + 100, 0);
let (head, head_eof) = drain_now(&inner).await;
let (head, head_eof) = drain_now(&inner, usize::MAX).await;
assert_eq!(head.len(), TCP_DRAIN_MAX_BYTES);
assert!(!head_eof, "premature eof would tear the session");
let (tail, tail_eof) = drain_now(&inner).await;
let (tail, tail_eof) = drain_now(&inner, usize::MAX).await;
assert_eq!(tail.len(), 100);
assert!(tail_eof, "eof finally flips when buffer is drained");
}