From 1d45dba2c2c261d4d865538029daaffebcca4294 Mon Sep 17 00:00:00 2001 From: dazzling-no-more <278675588+dazzling-no-more@users.noreply.github.com> Date: Sat, 25 Apr 2026 12:14:33 +0400 Subject: [PATCH] feat(tunnel): event-driven drain with adaptive long-poll --- src/tunnel_client.rs | 97 +++++- tunnel-node/src/main.rs | 683 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 718 insertions(+), 62 deletions(-) diff --git a/src/tunnel_client.rs b/src/tunnel_client.rs index ef2aee5..260596f 100644 --- a/src/tunnel_client.rs +++ b/src/tunnel_client.rs @@ -59,6 +59,15 @@ const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50); /// op (version mismatch). Must match `tunnel-node/src/main.rs`. const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP"; +/// Empty poll round-trip latency below which we conclude the tunnel-node +/// is *not* long-polling (legacy fixed-sleep drain instead). On a +/// long-poll-capable server an empty poll with no upstream push either +/// returns near `LONGPOLL_DEADLINE` (~5 s) or comes back early *with* +/// pushed bytes — neither matches a fast empty reply. Threshold sits +/// well above the legacy `~350 ms` drain and well below the long-poll +/// floor, so network jitter on either side won't false-trigger. +const LEGACY_DETECT_THRESHOLD: Duration = Duration::from_millis(1500); + /// Ports where the *server* speaks first (SMTP banner, SSH identification, /// POP3/IMAP greeting, FTP banner). On these, waiting for client bytes /// gains nothing and just adds handshake latency — skip the pre-read. @@ -102,6 +111,16 @@ pub struct TunnelMux { /// `connect_data` as unsupported. Subsequent sessions skip the /// optimistic path entirely and go straight to plain connect + data. connect_data_unsupported: Arc, + /// Set to `true` after we observe an empty poll round-trip that + /// returned in less than `LEGACY_DETECT_THRESHOLD` with no data. + /// On a long-poll-capable tunnel-node, an empty poll either returns + /// quickly *with data* (push arrived) or holds open until the + /// server's `LONGPOLL_DEADLINE`. A fast empty reply means the server + /// is doing the legacy fixed-sleep drain — in that mode, hammering + /// idle sessions at the new 500 ms cadence wastes Apps Script quota + /// for no benefit, so the loop reverts to the pre-long-poll + /// "skip empty polls when idle" behavior. + server_no_longpoll: Arc, /// Pre-read observability. Lets an operator see whether the 50 ms /// wait-for-first-bytes is pulling its weight: /// * `preread_win` — client sent bytes in time, bundled with connect @@ -134,6 +153,7 @@ impl TunnelMux { Arc::new(Self { tx, connect_data_unsupported: Arc::new(AtomicBool::new(false)), + server_no_longpoll: Arc::new(AtomicBool::new(false)), preread_win: AtomicU64::new(0), preread_loss: AtomicU64::new(0), preread_skip_port: AtomicU64::new(0), @@ -159,6 +179,19 @@ impl TunnelMux { } } + fn server_no_longpoll(&self) -> bool { + self.server_no_longpoll.load(Ordering::Relaxed) + } + + fn mark_server_no_longpoll(&self) { + if !self.server_no_longpoll.swap(true, Ordering::Relaxed) { + tracing::warn!( + "tunnel-node returned an empty poll faster than {:?}; assuming legacy (no long-poll) drain — falling back to skip-empty-when-idle to avoid quota waste", + LEGACY_DETECT_THRESHOLD, + ); + } + } + fn record_preread_win(&self, port: u16, elapsed: Duration) { self.preread_win.fetch_add(1, Ordering::Relaxed); self.preread_win_total_us @@ -649,14 +682,27 @@ async fn tunnel_loop( let mut consecutive_empty = 0u32; loop { + // Cadence depends on whether the tunnel-node is doing long-poll + // drains. With long-poll, the server holds empty polls open up + // to its `LONGPOLL_DEADLINE` (~5 s currently), so the client + // can keep this read timeout short — the wait is on the wire, + // not here. Against a *legacy* tunnel-node (no long-poll, fast + // empty replies), the same short cadence + always-poll behavior + // would generate continuous round-trips on idle sessions and + // burn Apps Script quota. The `server_no_longpoll` flag detects + // the legacy case from reply latency below and reverts to the + // pre-long-poll cadence: long sleep on local read, skip empty + // polls when sustained-idle. + let legacy_mode = mux.server_no_longpoll(); let client_data = if let Some(data) = pending_client_data.take() { Some(data) } else { - let read_timeout = match consecutive_empty { - 0 => Duration::from_millis(20), - 1 => Duration::from_millis(80), - 2 => Duration::from_millis(200), - _ => Duration::from_secs(30), + let read_timeout = match (legacy_mode, consecutive_empty) { + (_, 0) => Duration::from_millis(20), + (_, 1) => Duration::from_millis(80), + (_, 2) => Duration::from_millis(200), + (false, _) => Duration::from_millis(500), + (true, _) => Duration::from_secs(30), }; match tokio::time::timeout(read_timeout, reader.read(&mut buf)).await { @@ -670,13 +716,21 @@ async fn tunnel_loop( } }; - if client_data.is_none() && consecutive_empty > 3 { + // Legacy-server skip: against a non-long-polling tunnel-node, + // an empty poll is wasted work — fast-empty reply, no push + // delivery benefit. Preserve the pre-long-poll behavior of + // going quiet after a few empties. Long-poll-capable servers + // skip this branch and always send the empty op so the server + // can hold it open. + if legacy_mode && client_data.is_none() && consecutive_empty > 3 { continue; } let data = client_data.unwrap_or_default(); + let was_empty_poll = data.is_empty(); let (reply_tx, reply_rx) = oneshot::channel(); + let send_at = Instant::now(); mux.send(MuxMsg::Data { sid: sid.to_string(), data, @@ -701,6 +755,21 @@ async fn tunnel_loop( } }; + // Legacy-server detection: an empty-in/empty-out round trip + // that finishes well under `LEGACY_DETECT_THRESHOLD` is + // structurally impossible on a long-poll-capable tunnel-node + // (the server holds the response either until data arrives or + // until its long-poll deadline). One observation flips the + // sticky flag for the rest of this process. Skip the check + // once already in legacy mode — the comparison is cheap, but + // calling `mark_server_no_longpoll` repeatedly muddies logs. + if !legacy_mode && was_empty_poll { + let reply_was_empty = resp.d.as_deref().map(str::is_empty).unwrap_or(true); + if reply_was_empty && send_at.elapsed() < LEGACY_DETECT_THRESHOLD { + mux.mark_server_no_longpoll(); + } + } + if let Some(ref e) = resp.e { tracing::debug!("tunnel error: {}", e); break; @@ -844,6 +913,7 @@ mod tests { let mux = Arc::new(TunnelMux { tx, connect_data_unsupported: Arc::new(AtomicBool::new(false)), + server_no_longpoll: Arc::new(AtomicBool::new(false)), preread_win: AtomicU64::new(0), preread_loss: AtomicU64::new(0), preread_skip_port: AtomicU64::new(0), @@ -932,6 +1002,21 @@ mod tests { assert!(mux.connect_data_unsupported()); } + /// `server_no_longpoll` must be sticky too: once we see a legacy + /// fast-empty reply, every subsequent session uses the legacy idle + /// cadence (long read timeout + skip-empty) for the rest of the + /// process. Flipping it back per-session would either thrash the + /// cadence or double the detection cost. + #[test] + fn no_longpoll_cache_is_sticky() { + let (mux, _rx) = mux_for_test(); + assert!(!mux.server_no_longpoll()); + mux.mark_server_no_longpoll(); + assert!(mux.server_no_longpoll()); + mux.mark_server_no_longpoll(); // idempotent + assert!(mux.server_no_longpoll()); + } + #[test] fn preread_counters_track_each_outcome() { let (mux, _rx) = mux_for_test(); diff --git a/tunnel-node/src/main.rs b/tunnel-node/src/main.rs index b28ffec..49659de 100644 --- a/tunnel-node/src/main.rs +++ b/tunnel-node/src/main.rs @@ -25,7 +25,7 @@ use serde::{Deserialize, Serialize}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::TcpStream; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex, Notify}; use tokio::task::JoinSet; /// Structured error code returned when the tunnel-node receives an op it @@ -33,6 +33,49 @@ use tokio::task::JoinSet; /// detect a version mismatch and gracefully fall back. const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP"; +/// Drain-phase deadline when the batch contained writes or new +/// connections. We expect upstream servers to respond fast (TLS +/// ServerHello, HTTP response) so this is a ceiling for slow targets; +/// `wait_for_any_drainable` returns much sooner — usually within +/// milliseconds — once any session in the batch fires its notify. +const ACTIVE_DRAIN_DEADLINE: Duration = Duration::from_millis(350); + +/// After the first session in an active batch wakes the wait, we sleep +/// briefly so neighboring sessions whose responses land just after the +/// first one don't get reported empty and pay an extra round-trip. Only +/// applies to active batches — for long-poll batches the wake event IS +/// the data we want, so we deliver it immediately. +/// +/// 30 ms is much shorter than the legacy two-pass retry (150 + 200 ms) +/// but covers the typical case of co-located upstreams whose RTTs +/// cluster within a few tens of ms of each other. +const STRAGGLER_SETTLE: Duration = Duration::from_millis(30); + +/// Drain-phase deadline when the batch is a pure poll (no writes, no new +/// connections — clients just asking "any push data?"). Holding the +/// response open delivers server-initiated bytes (push notifications, +/// chat messages, server-sent events) within roughly one RTT instead of +/// waiting for the client's next tick. +/// +/// **This is a knob, not a constant of nature.** It trades push latency +/// against the worst-case "client wants to send while mid-poll" delay: +/// the tunnel-client's `tunnel_loop` is strictly serial (one in-flight +/// op per session), so any local bytes that arrive while the poll is +/// being held are stuck in the kernel until the poll returns. +/// +/// * Lower (e.g. 2 s) — interactive shells / typing-burst flows feel +/// snappier, but push-only sessions pay more empty round-trips. +/// * Higher (e.g. 20 s) — push delivery is near-RTT and round-trip +/// count is minimal, but a thinking pause between keystrokes can +/// tax the next keystroke by up to the chosen value. +/// +/// 5 s is a middle ground: a typing user pausing mid-thought pays at +/// most a 5 s nudge before their next keystroke flows, while idle +/// sessions still get the bulk of the long-poll benefit. Must also +/// stay safely below the client's `BATCH_TIMEOUT` (30 s) and Apps +/// Script's UrlFetch ceiling (~60 s). +const LONGPOLL_DEADLINE: Duration = Duration::from_secs(5); + // --------------------------------------------------------------------------- // Session // --------------------------------------------------------------------------- @@ -42,6 +85,11 @@ struct SessionInner { read_buf: Mutex>, eof: AtomicBool, last_active: Mutex, + /// Fired by `reader_task` whenever new bytes land in `read_buf` or the + /// upstream socket closes. `wait_for_any_drainable` listens on this + /// to wake the drain phase as soon as any session has something to + /// ship, replacing the old fixed-sleep heuristic. + notify: Notify, } struct ManagedSession { @@ -62,6 +110,7 @@ async fn create_session(host: &str, port: u16) -> std::io::Result) { let mut buf = vec![0u8; 65536]; loop { match reader.read(&mut buf).await { - Ok(0) => { session.eof.store(true, Ordering::Release); break; } - Ok(n) => { session.read_buf.lock().await.extend_from_slice(&buf[..n]); } - Err(_) => { session.eof.store(true, Ordering::Release); break; } + Ok(0) => { + session.eof.store(true, Ordering::Release); + session.notify.notify_one(); + break; + } + Ok(n) => { + // Extend the buffer before notifying. The MutexGuard is + // dropped at the end of the statement, *before* the + // notify_one call below, so any waiter that wakes on the + // notify and then locks read_buf can immediately observe + // the new bytes — no torn read where the wake fires but + // the buffer still looks empty. Notify::notify_one also + // stores a permit if no waiter is currently registered, + // so we never lose an edge across the spawn race in + // wait_for_any_drainable. + session.read_buf.lock().await.extend_from_slice(&buf[..n]); + session.notify.notify_one(); + } + Err(_) => { + session.eof.store(true, Ordering::Release); + session.notify.notify_one(); + break; + } } } } @@ -90,6 +159,99 @@ async fn drain_now(session: &SessionInner) -> (Vec, bool) { (data, eof) } +/// Block until *any* of `inners` has buffered data, hits EOF, or the +/// deadline elapses — whichever comes first. Returns immediately if any +/// session is already drainable when called. +/// +/// This replaces the legacy `sleep(150ms)` + `sleep(200ms)` retry pattern +/// in batch drain. With `reader_task` firing `notify_one` on each +/// appended chunk, a typical TLS ServerHello (~30-50 ms) wakes the wait +/// in milliseconds instead of paying the 150 ms ceiling. For pure-poll +/// batches the same primitive holds the response open until upstream +/// pushes data or `LONGPOLL_DEADLINE` elapses, turning idle sessions +/// into a true long-poll. +/// +/// Race-safety: +/// * `Notify::notify_one` stores a one-shot permit if no waiter is +/// registered, so a notify that fires between the buffer check and +/// the watcher's `.notified().await` is consumed on the next poll +/// rather than lost. +/// * Watchers self-filter against observable session state. A prior +/// batch that returned via the spawn-race shortcut may leave a +/// stale permit on the `Notify`; this batch's watcher will consume +/// it but, finding the buffer empty and EOF unset, loop back to +/// wait for a real notify. Without this filter, an idle long-poll +/// batch could return in <1 ms on a stale permit and degrade push +/// delivery to the client's idle re-poll cadence. +async fn wait_for_any_drainable(inners: &[Arc], deadline: Duration) { + if inners.is_empty() { + return; + } + + // One watcher per session. Each loops until it observes real state + // (eof set or buffer non-empty) before signaling — see the + // race-safety note on `wait_for_any_drainable` for why. We abort the + // watchers on return; the only state they hold is a notify + // subscription, so abort is clean. + let (tx, mut rx) = mpsc::channel::<()>(1); + let mut watchers = Vec::with_capacity(inners.len()); + for inner in inners { + let inner = inner.clone(); + let tx = tx.clone(); + watchers.push(tokio::spawn(async move { + loop { + inner.notify.notified().await; + if inner.eof.load(Ordering::Acquire) { + break; + } + if !inner.read_buf.lock().await.is_empty() { + break; + } + // Stale permit (notify fired but state didn't change in + // an observable way — e.g., bytes were already drained + // by a prior batch). Loop back and wait for a real + // notify, don't wake the caller. + } + let _ = tx.try_send(()); + })); + } + drop(tx); + + // Spawn-race shortcut: if state was already drainable when we got + // here (bytes arrived between phase 1 and this point), return + // without entering the select. The watcher self-filtering above + // means the unconsumed permit we leave behind here is harmless to + // future batches. + let already_ready = is_any_drainable(inners).await; + + if !already_ready { + tokio::select! { + _ = rx.recv() => {} + _ = tokio::time::sleep(deadline) => {} + } + } + + for w in &watchers { + w.abort(); + } +} + +/// True iff any session is currently drainable: its read buffer has +/// bytes, or it's been marked EOF. Pulled out of `wait_for_any_drainable` +/// so the same predicate can drive both the spawn-race shortcut and the +/// post-wake straggler poll. +async fn is_any_drainable(inners: &[Arc]) -> bool { + for inner in inners { + if inner.eof.load(Ordering::Acquire) { + return true; + } + if !inner.read_buf.lock().await.is_empty() { + return true; + } + } + false +} + /// Wait for response data with drain window. Used by single-op mode. async fn wait_and_drain(session: &SessionInner, max_wait: Duration) -> (Vec, bool) { let deadline = Instant::now() + max_wait; @@ -251,23 +413,28 @@ async fn handle_batch( return (StatusCode::OK, [(header::CONTENT_TYPE, "application/json")], resp); } - // Process all ops. For "data" ops, first write all outbound data, - // then do a short sleep to let servers respond, then drain all. - // This batches the network round trips on the server side too. - - // Phase 1: process connects and writes. + // Process all ops in two phases. // - // `connect` and `connect_data` ops each establish a brand-new upstream - // TCP connection which can take up to 10 s (create_session timeout). - // Running them inline head-of-line-blocks every other op in the batch, - // so we dispatch both into a JoinSet and await them concurrently below. + // Phase 1: dispatch new connections concurrently and write outbound + // bytes for "data" ops. We track whether any op did real work + // (`had_writes_or_connects`) — this drives the deadline picked in + // phase 2. // - // `connect_data` is expected to dominate in practice (new client) but - // we still hit `connect` from older clients or from server-speaks-first - // ports that skip the pre-read — if a slow `connect` landed in the same - // batch as data-bearing ops it could stall everyone. + // `connect` and `connect_data` each establish a brand-new upstream TCP + // connection (up to 10 s timeout in `create_session`). Running them + // inline would head-of-line-block every other op in the batch, so we + // dispatch both into a JoinSet and await them concurrently below. + // + // `connect_data` dominates in practice (new clients), but `connect` + // still fires from server-speaks-first ports and from the preread + // timeout fallback path. let mut results: Vec<(usize, TunnelResponse)> = Vec::with_capacity(req.ops.len()); let mut data_ops: Vec<(usize, String)> = Vec::new(); // (index, sid) for data ops needing drain + // True iff the batch contained any op that performed a real action + // upstream — a new connection or a non-empty data write. A batch of + // only empty "data" polls (and possibly closes) leaves this false and + // qualifies for long-poll behavior in phase 2. + let mut had_writes_or_connects = false; enum NewConn { Connect(TunnelResponse), @@ -278,6 +445,7 @@ async fn handle_batch( for (i, op) in req.ops.iter().enumerate() { match op.op.as_str() { "connect" => { + had_writes_or_connects = true; let state = state.clone(); let host = op.host.clone(); let port = op.port; @@ -286,15 +454,16 @@ async fn handle_batch( }); } "connect_data" => { + had_writes_or_connects = true; let state = state.clone(); let host = op.host.clone(); let port = op.port; let d = op.d.clone(); new_conn_jobs.spawn(async move { // Drop the returned Arc: phase 2 below - // holds the sessions-map lock once for the whole batch - // and re-looks up each sid, which is cheap. The Arc - // return is a convenience for the single-op path only. + // re-looks up each sid under one sessions-map lock, + // which is cheap. The Arc return is a convenience for + // the single-op path only. let r = handle_connect_data_phase1(&state, host, port, d) .await .map(|(sid, _inner)| sid); @@ -313,6 +482,7 @@ async fn handle_batch( *session.inner.last_active.lock().await = Instant::now(); if let Some(ref data_b64) = op.d { if !data_b64.is_empty() { + had_writes_or_connects = true; if let Ok(bytes) = B64.decode(data_b64) { if !bytes.is_empty() { let mut w = session.inner.writer.lock().await; @@ -355,54 +525,68 @@ async fn handle_batch( } } - // Phase 2: short wait for servers to respond, then drain all data sessions + // Phase 2: signal-driven wait for any session to have data (or hit + // EOF), then drain everyone in a single pass. The deadline is + // adaptive: + // * `ACTIVE_DRAIN_DEADLINE` (~350 ms) when the batch had real work + // — typical responses arrive in ms and `wait_for_any_drainable` + // returns on the first notify. After the first wake we settle + // for `STRAGGLER_SETTLE` so neighboring sessions whose replies + // land just behind the first one don't get reported empty. + // * `LONGPOLL_DEADLINE` when the batch is a pure poll — no writes, + // no new connections. The response is held open until upstream + // pushes data, turning idle sessions into a true long-poll + // without paying per-poll latency. No straggler settle here: + // the wake event IS the data the client wants, so deliver it + // immediately. if !data_ops.is_empty() { - // Give servers a moment to respond to the data we just wrote - tokio::time::sleep(Duration::from_millis(150)).await; + let deadline = if had_writes_or_connects { + ACTIVE_DRAIN_DEADLINE + } else { + LONGPOLL_DEADLINE + }; - // First drain pass + // Snapshot SessionInner Arcs under a single lock so we don't + // hold the sessions-map lock across the await. + let inners: Vec> = { + let sessions = state.sessions.lock().await; + data_ops + .iter() + .filter_map(|(_, sid)| sessions.get(sid).map(|s| s.inner.clone())) + .collect() + }; + + let wait_start = Instant::now(); + wait_for_any_drainable(&inners, deadline).await; + + // Straggler settle: only for active batches, only if we woke + // early (didn't hit the deadline). Capped by the remaining + // deadline budget — `saturating_sub` so a future refactor that + // ever lets `elapsed > deadline` slip through can't underflow. + if had_writes_or_connects { + let remaining = deadline.saturating_sub(wait_start.elapsed()); + if !remaining.is_zero() { + tokio::time::sleep(STRAGGLER_SETTLE.min(remaining)).await; + } + } + + // Single drain pass for all sessions in this batch. { let sessions = state.sessions.lock().await; - let mut need_retry = Vec::new(); for (i, sid) in &data_ops { if let Some(session) = sessions.get(sid) { let (data, eof) = drain_now(&session.inner).await; - if data.is_empty() && !eof { - need_retry.push((*i, sid.clone())); - } else { - results.push((*i, TunnelResponse { - sid: Some(sid.clone()), - d: if data.is_empty() { None } else { Some(B64.encode(&data)) }, - eof: Some(eof), e: None, code: None, - })); - } + results.push((*i, TunnelResponse { + sid: Some(sid.clone()), + d: if data.is_empty() { None } else { Some(B64.encode(&data)) }, + eof: Some(eof), e: None, code: None, + })); } else { results.push((*i, TunnelResponse { sid: Some(sid.clone()), d: None, eof: Some(true), e: None, code: None, })); } } - drop(sessions); - - // Retry sessions that had no data yet - if !need_retry.is_empty() { - tokio::time::sleep(Duration::from_millis(200)).await; - let sessions = state.sessions.lock().await; - for (i, sid) in &need_retry { - if let Some(s) = sessions.get(sid) { - let (data, eof) = drain_now(&s.inner).await; - results.push((*i, TunnelResponse { - sid: Some(sid.clone()), - d: if data.is_empty() { None } else { Some(B64.encode(&data)) }, - eof: Some(eof), e: None, code: None, - })); - } else { - results.push((*i, TunnelResponse { - sid: Some(sid.clone()), d: None, eof: Some(true), e: None, code: None, - })); - } - } - } } // Clean up eof sessions @@ -799,5 +983,392 @@ mod tests { // Session should NOT be in the map since phase1 rejected it. assert!(state.sessions.lock().await.is_empty()); } -} + // --------------------------------------------------------------------- + // wait_for_any_drainable + notify wiring + // + // These guard the new event-driven drain. Regressions here mean the + // batch handler either falls back to fixed sleeps (latency win lost) + // or wedges on a missed signal (correctness lost) — both silent + // without explicit tests. + // --------------------------------------------------------------------- + + /// Build a SessionInner with no reader_task, suitable for tests that + /// drive the read_buf / eof / notify state by hand. The writer half + /// is wired to a live loopback peer so the Mutex has + /// a real value, but tests never touch it. + async fn fake_inner() -> Arc { + let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap(); + let addr = listener.local_addr().unwrap(); + let accept = tokio::spawn(async move { listener.accept().await.unwrap().0 }); + let client = TcpStream::connect(addr).await.unwrap(); + let _server_side = accept.await.unwrap(); + let (_reader, writer) = client.into_split(); + + Arc::new(SessionInner { + writer: Mutex::new(writer), + read_buf: Mutex::new(Vec::new()), + eof: AtomicBool::new(false), + last_active: Mutex::new(Instant::now()), + notify: Notify::new(), + }) + } + + #[tokio::test] + async fn wait_for_any_drainable_returns_immediately_when_buffer_has_data() { + let inner = fake_inner().await; + inner.read_buf.lock().await.extend_from_slice(b"already here"); + + let t0 = Instant::now(); + wait_for_any_drainable(&[inner], Duration::from_secs(5)).await; + assert!( + t0.elapsed() < Duration::from_millis(100), + "should short-circuit on pre-buffered data, took {:?}", + t0.elapsed() + ); + } + + #[tokio::test] + async fn wait_for_any_drainable_returns_immediately_when_eof_set() { + let inner = fake_inner().await; + inner.eof.store(true, Ordering::Release); + + let t0 = Instant::now(); + wait_for_any_drainable(&[inner], Duration::from_secs(5)).await; + assert!( + t0.elapsed() < Duration::from_millis(100), + "should short-circuit on pre-set eof, took {:?}", + t0.elapsed() + ); + } + + #[tokio::test] + async fn wait_for_any_drainable_returns_immediately_for_empty_list() { + let t0 = Instant::now(); + wait_for_any_drainable(&[], Duration::from_secs(5)).await; + assert!( + t0.elapsed() < Duration::from_millis(50), + "empty input should be a no-op, took {:?}", + t0.elapsed() + ); + } + + #[tokio::test] + async fn wait_for_any_drainable_wakes_on_notify() { + let inner = fake_inner().await; + let signal = inner.clone(); + + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(80)).await; + signal.read_buf.lock().await.extend_from_slice(b"pushed"); + signal.notify.notify_one(); + }); + + let t0 = Instant::now(); + wait_for_any_drainable(&[inner], Duration::from_secs(5)).await; + let elapsed = t0.elapsed(); + // We only assert the upper bound — wake latency under load can be + // tens of ms but should never approach the 5 s deadline. + assert!( + elapsed < Duration::from_millis(800), + "did not wake on notify within reasonable time: {:?}", + elapsed + ); + } + + /// Any-of-N: when one session in a multi-session batch fires its + /// notify, the wait returns. Regression here would mean idle + /// neighbors block the drain for a session that has data ready. + #[tokio::test] + async fn wait_for_any_drainable_wakes_on_any_session_notify() { + let a = fake_inner().await; + let b = fake_inner().await; + let c = fake_inner().await; + let signal = b.clone(); + + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(80)).await; + signal.read_buf.lock().await.push(b'x'); + signal.notify.notify_one(); + }); + + let t0 = Instant::now(); + wait_for_any_drainable(&[a, b, c], Duration::from_secs(5)).await; + assert!( + t0.elapsed() < Duration::from_millis(800), + "any-of-N wake too slow: {:?}", + t0.elapsed() + ); + } + + /// Stale-permit guard: if a previous batch consumed the buffer and + /// returned via the spawn-race shortcut without consuming the notify + /// permit, the next batch's watcher consumes that stale permit but + /// MUST NOT wake the caller — the buffer is empty. This regressed + /// silently in the first version; the self-filtering watcher closes + /// it. Without this test, an empty long-poll batch could return in + /// <1 ms and degrade push delivery to the client's idle re-poll + /// cadence (~500 ms). + #[tokio::test] + async fn wait_for_any_drainable_ignores_stale_permit() { + let inner = fake_inner().await; + + // Plant a permit (no waiter yet, so it's stored as a one-shot). + inner.notify.notify_one(); + + // Buffer is empty and EOF is unset, so the only thing that + // could wake the wait is the permit. With self-filtering the + // watcher consumes it, sees no observable state, loops back — + // the wait should run for the full deadline and then return. + let deadline = Duration::from_millis(200); + let t0 = Instant::now(); + wait_for_any_drainable(&[inner], deadline).await; + let elapsed = t0.elapsed(); + assert!( + elapsed >= deadline, + "stale permit incorrectly woke the wait: {:?} < {:?}", + elapsed, + deadline + ); + } + + #[tokio::test] + async fn wait_for_any_drainable_hits_deadline_when_no_events() { + let inner = fake_inner().await; + let deadline = Duration::from_millis(150); + + let t0 = Instant::now(); + wait_for_any_drainable(&[inner], deadline).await; + let elapsed = t0.elapsed(); + assert!( + elapsed >= deadline, + "returned before deadline: {:?} < {:?}", + elapsed, + deadline + ); + assert!( + elapsed < deadline + Duration::from_millis(300), + "overshot deadline by too much: {:?}", + elapsed + ); + } + + /// Real reader_task → notify path. If reader_task ever stops calling + /// notify_one after an extend, the long-poll silently degrades to + /// "wait the full deadline every time" — this catches that. + #[tokio::test] + async fn reader_task_notifies_on_incoming_bytes() { + let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap(); + let addr = listener.local_addr().unwrap(); + let server = tokio::spawn(async move { + let (mut sock, _) = listener.accept().await.unwrap(); + tokio::time::sleep(Duration::from_millis(80)).await; + sock.write_all(b"hello").await.unwrap(); + sock.flush().await.unwrap(); + // Hold the connection so reader_task doesn't immediately EOF + // and confuse the assertion. + tokio::time::sleep(Duration::from_secs(2)).await; + }); + + let stream = TcpStream::connect(addr).await.unwrap(); + let (reader, writer) = stream.into_split(); + let inner = Arc::new(SessionInner { + writer: Mutex::new(writer), + read_buf: Mutex::new(Vec::new()), + eof: AtomicBool::new(false), + last_active: Mutex::new(Instant::now()), + notify: Notify::new(), + }); + let _reader_handle = tokio::spawn(reader_task(reader, inner.clone())); + + let t0 = Instant::now(); + wait_for_any_drainable(&[inner.clone()], Duration::from_secs(2)).await; + let elapsed = t0.elapsed(); + assert!( + elapsed < Duration::from_millis(800), + "wait did not wake on reader_task notify: {:?}", + elapsed + ); + assert_eq!(&inner.read_buf.lock().await[..], b"hello"); + + // The spawned server's only job is to deliver one chunk and hold + // the connection open long enough for the assertion. abort() is + // intentional cleanup, not a failure path. + server.abort(); + } + + // --------------------------------------------------------------------- + // handle_batch deadline selection (end-to-end through the actual + // batch handler — not just wait_for_any_drainable in isolation) + // + // These tests guard the adaptive deadline logic: an empty-poll batch + // must engage LONGPOLL_DEADLINE, an active batch must cap at + // ACTIVE_DRAIN_DEADLINE + STRAGGLER_SETTLE, and `Some("")` must NOT + // count as a write. Each was a separate review concern and would + // regress silently without explicit coverage. + // --------------------------------------------------------------------- + + /// TCP server that pushes `data` exactly `delay` after accept, + /// without reading from the client first. Simulates server-initiated + /// push (notifications, SSE) on a real socket. + async fn start_push_server(delay: Duration, data: Vec) -> u16 { + let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap(); + let port = listener.local_addr().unwrap().port(); + tokio::spawn(async move { + if let Ok((mut sock, _)) = listener.accept().await { + tokio::time::sleep(delay).await; + let _ = sock.write_all(&data).await; + let _ = sock.flush().await; + // Hold the socket open well beyond any test's deadline + // so reader_task doesn't EOF mid-assertion. + tokio::time::sleep(Duration::from_secs(10)).await; + } + }); + port + } + + /// TCP server that accepts and does NOTHING — never writes, never + /// closes. Used to test deadline behavior when there's no upstream + /// response. + async fn start_silent_server() -> u16 { + let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap(); + let port = listener.local_addr().unwrap().port(); + tokio::spawn(async move { + if let Ok((sock, _)) = listener.accept().await { + // Hold the socket alive past any reasonable test deadline. + tokio::time::sleep(Duration::from_secs(60)).await; + drop(sock); + } + }); + port + } + + /// Drive `handle_batch` end-to-end and parse its JSON response into a + /// `serde_json::Value` for assertion (TunnelResponse/BatchResponse + /// don't derive Deserialize, and we don't want to add it just for + /// tests). + async fn invoke_handle_batch(state: &AppState, body: Vec) -> serde_json::Value { + let resp = handle_batch(State(state.clone()), Bytes::from(body)) + .await + .into_response(); + let body_bytes = axum::body::to_bytes(resp.into_body(), usize::MAX).await.unwrap(); + serde_json::from_slice(&body_bytes).unwrap() + } + + /// Pure-poll batch (one `data` op with no `d`) holds open and wakes + /// when upstream pushes data. Push arrives at ~150 ms — well past + /// any active-batch ceiling. If long-poll didn't engage we'd return + /// at ACTIVE_DRAIN_DEADLINE (350 ms) with no data. + #[tokio::test] + async fn batch_pure_poll_wakes_on_push() { + let push_port = start_push_server( + Duration::from_millis(150), + b"PUSHED".to_vec(), + ).await; + let state = fresh_state(); + let connect_resp = handle_connect(&state, Some("127.0.0.1".into()), Some(push_port)).await; + let sid = connect_resp.sid.expect("connect should succeed"); + + let body = serde_json::to_vec(&serde_json::json!({ + "k": "test-key", + "ops": [{"op": "data", "sid": sid}], + })).unwrap(); + + let t0 = Instant::now(); + let resp = invoke_handle_batch(&state, body).await; + let elapsed = t0.elapsed(); + + assert!( + elapsed >= Duration::from_millis(120), + "returned before push could realistically arrive: {:?}", + elapsed + ); + assert!( + elapsed < Duration::from_millis(700), + "long-poll did not return promptly on push: {:?}", + elapsed + ); + + let r = resp["r"].as_array().expect("response must be an array"); + let d_b64 = r[0]["d"].as_str().expect("response should carry pushed bytes"); + let data = B64.decode(d_b64).unwrap(); + assert_eq!(&data[..], b"PUSHED"); + } + + /// Active batch (write op) bounds the wait at roughly + /// ACTIVE_DRAIN_DEADLINE + a little overhead, even when upstream + /// doesn't respond. Upper bound proves long-poll did NOT engage. + #[tokio::test] + async fn batch_active_caps_at_active_deadline() { + let silent_port = start_silent_server().await; + let state = fresh_state(); + let connect_resp = handle_connect(&state, Some("127.0.0.1".into()), Some(silent_port)).await; + let sid = connect_resp.sid.expect("connect should succeed"); + + let body = serde_json::to_vec(&serde_json::json!({ + "k": "test-key", + "ops": [{"op": "data", "sid": sid, "d": B64.encode(b"PING")}], + })).unwrap(); + + let t0 = Instant::now(); + let _resp = invoke_handle_batch(&state, body).await; + let elapsed = t0.elapsed(); + + // No upstream response → wait full ACTIVE_DRAIN_DEADLINE (~350ms), + // no straggler settle (we never woke). Upper bound is tight + // enough that a regression bumping the active deadline above + // ~600ms would fail this test instead of slipping through. + assert!( + elapsed >= Duration::from_millis(300), + "active batch returned before active deadline: {:?}", + elapsed + ); + assert!( + elapsed < Duration::from_millis(600), + "active batch held longer than ACTIVE_DRAIN_DEADLINE + margin: {:?}", + elapsed + ); + } + + /// `Some("")` must NOT flip `had_writes_or_connects`. If it did, the + /// batch would return at the active deadline (350 ms) without the + /// pushed bytes — push arrives at 600 ms here, deliberately past + /// the active ceiling, so the only way the test gets data is if + /// long-poll actually engaged. + #[tokio::test] + async fn batch_empty_string_payload_engages_long_poll() { + let push_port = start_push_server( + Duration::from_millis(600), + b"DELAYED".to_vec(), + ).await; + let state = fresh_state(); + let connect_resp = handle_connect(&state, Some("127.0.0.1".into()), Some(push_port)).await; + let sid = connect_resp.sid.expect("connect should succeed"); + + let body = serde_json::to_vec(&serde_json::json!({ + "k": "test-key", + "ops": [{"op": "data", "sid": sid, "d": ""}], + })).unwrap(); + + let t0 = Instant::now(); + let resp = invoke_handle_batch(&state, body).await; + let elapsed = t0.elapsed(); + + assert!( + elapsed >= Duration::from_millis(550), + "returned before push arrived (deadline likely set to active, not long-poll): {:?}", + elapsed + ); + assert!( + elapsed < Duration::from_millis(1100), + "long-poll didn't wake promptly on push: {:?}", + elapsed + ); + + let r = resp["r"].as_array().unwrap(); + let d_b64 = r[0]["d"].as_str() + .expect("Some(\"\") payload should have engaged long-poll and delivered DELAYED"); + let data = B64.decode(d_b64).unwrap(); + assert_eq!(&data[..], b"DELAYED"); + } +}